mirror of
https://gitea.invidious.io/iv-org/shard-crystal-db.git
synced 2024-08-15 00:53:32 +00:00
Introduce DB::Pool::Options
This commit is contained in:
parent
36f0a11d07
commit
b3683283d2
5 changed files with 67 additions and 28 deletions
|
@ -22,10 +22,10 @@ describe DB::Pool do
|
||||||
expected_per_connection = 5
|
expected_per_connection = 5
|
||||||
requests = fixed_pool_size * expected_per_connection
|
requests = fixed_pool_size * expected_per_connection
|
||||||
|
|
||||||
pool = DB::Pool.new(
|
pool = DB::Pool.new(DB::Pool::Options.new(
|
||||||
initial_pool_size: fixed_pool_size,
|
initial_pool_size: fixed_pool_size,
|
||||||
max_pool_size: fixed_pool_size,
|
max_pool_size: fixed_pool_size,
|
||||||
max_idle_pool_size: fixed_pool_size) {
|
max_idle_pool_size: fixed_pool_size)) {
|
||||||
HTTP::Client.new(URI.parse("http://127.0.0.1:#{address.port}/"))
|
HTTP::Client.new(URI.parse("http://127.0.0.1:#{address.port}/"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,15 +57,19 @@ class Closable
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
private def create_pool(**options, &factory : -> T) forall T
|
||||||
|
DB::Pool.new(DB::Pool::Options.new(**options), &factory)
|
||||||
|
end
|
||||||
|
|
||||||
describe DB::Pool do
|
describe DB::Pool do
|
||||||
it "should use proc to create objects" do
|
it "should use proc to create objects" do
|
||||||
block_called = 0
|
block_called = 0
|
||||||
pool = DB::Pool.new(initial_pool_size: 3) { block_called += 1; Closable.new }
|
pool = create_pool(initial_pool_size: 3) { block_called += 1; Closable.new }
|
||||||
block_called.should eq(3)
|
block_called.should eq(3)
|
||||||
end
|
end
|
||||||
|
|
||||||
it "should get resource" do
|
it "should get resource" do
|
||||||
pool = DB::Pool.new { Closable.new }
|
pool = create_pool { Closable.new }
|
||||||
resource = pool.checkout
|
resource = pool.checkout
|
||||||
resource.should be_a Closable
|
resource.should be_a Closable
|
||||||
resource.before_checkout_called.should be_true
|
resource.before_checkout_called.should be_true
|
||||||
|
@ -73,18 +77,18 @@ describe DB::Pool do
|
||||||
|
|
||||||
it "should be available if not checkedout" do
|
it "should be available if not checkedout" do
|
||||||
resource = uninitialized Closable
|
resource = uninitialized Closable
|
||||||
pool = DB::Pool.new(initial_pool_size: 1) { resource = Closable.new }
|
pool = create_pool(initial_pool_size: 1) { resource = Closable.new }
|
||||||
pool.is_available?(resource).should be_true
|
pool.is_available?(resource).should be_true
|
||||||
end
|
end
|
||||||
|
|
||||||
it "should not be available if checkedout" do
|
it "should not be available if checkedout" do
|
||||||
pool = DB::Pool.new { Closable.new }
|
pool = create_pool { Closable.new }
|
||||||
resource = pool.checkout
|
resource = pool.checkout
|
||||||
pool.is_available?(resource).should be_false
|
pool.is_available?(resource).should be_false
|
||||||
end
|
end
|
||||||
|
|
||||||
it "should be available if returned" do
|
it "should be available if returned" do
|
||||||
pool = DB::Pool.new { Closable.new }
|
pool = create_pool { Closable.new }
|
||||||
resource = pool.checkout
|
resource = pool.checkout
|
||||||
resource.after_release_called.should be_false
|
resource.after_release_called.should be_false
|
||||||
pool.release resource
|
pool.release resource
|
||||||
|
@ -93,7 +97,7 @@ describe DB::Pool do
|
||||||
end
|
end
|
||||||
|
|
||||||
it "should wait for available resource" do
|
it "should wait for available resource" do
|
||||||
pool = DB::Pool.new(max_pool_size: 1, initial_pool_size: 1) { Closable.new }
|
pool = create_pool(max_pool_size: 1, initial_pool_size: 1) { Closable.new }
|
||||||
|
|
||||||
b_cnn_request = ShouldSleepingOp.new
|
b_cnn_request = ShouldSleepingOp.new
|
||||||
wait_a = WaitFor.new
|
wait_a = WaitFor.new
|
||||||
|
@ -121,7 +125,7 @@ describe DB::Pool do
|
||||||
|
|
||||||
it "should create new if max was not reached" do
|
it "should create new if max was not reached" do
|
||||||
block_called = 0
|
block_called = 0
|
||||||
pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { block_called += 1; Closable.new }
|
pool = create_pool(max_pool_size: 2, initial_pool_size: 1) { block_called += 1; Closable.new }
|
||||||
block_called.should eq 1
|
block_called.should eq 1
|
||||||
pool.checkout
|
pool.checkout
|
||||||
block_called.should eq 1
|
block_called.should eq 1
|
||||||
|
@ -131,7 +135,7 @@ describe DB::Pool do
|
||||||
|
|
||||||
it "should reuse returned resources" do
|
it "should reuse returned resources" do
|
||||||
all = [] of Closable
|
all = [] of Closable
|
||||||
pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
|
pool = create_pool(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
|
||||||
pool.checkout
|
pool.checkout
|
||||||
b1 = pool.checkout
|
b1 = pool.checkout
|
||||||
pool.release b1
|
pool.release b1
|
||||||
|
@ -143,7 +147,7 @@ describe DB::Pool do
|
||||||
|
|
||||||
it "should close available and total" do
|
it "should close available and total" do
|
||||||
all = [] of Closable
|
all = [] of Closable
|
||||||
pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
|
pool = create_pool(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
|
||||||
a = pool.checkout
|
a = pool.checkout
|
||||||
b = pool.checkout
|
b = pool.checkout
|
||||||
pool.release b
|
pool.release b
|
||||||
|
@ -157,7 +161,7 @@ describe DB::Pool do
|
||||||
end
|
end
|
||||||
|
|
||||||
it "should timeout" do
|
it "should timeout" do
|
||||||
pool = DB::Pool.new(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
|
pool = create_pool(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
|
||||||
pool.checkout
|
pool.checkout
|
||||||
expect_raises DB::PoolTimeout do
|
expect_raises DB::PoolTimeout do
|
||||||
pool.checkout
|
pool.checkout
|
||||||
|
@ -165,7 +169,7 @@ describe DB::Pool do
|
||||||
end
|
end
|
||||||
|
|
||||||
it "should be able to release after a timeout" do
|
it "should be able to release after a timeout" do
|
||||||
pool = DB::Pool.new(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
|
pool = create_pool(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
|
||||||
a = pool.checkout
|
a = pool.checkout
|
||||||
pool.checkout rescue nil
|
pool.checkout rescue nil
|
||||||
pool.release a
|
pool.release a
|
||||||
|
@ -173,7 +177,7 @@ describe DB::Pool do
|
||||||
|
|
||||||
it "should close if max idle amount is reached" do
|
it "should close if max idle amount is reached" do
|
||||||
all = [] of Closable
|
all = [] of Closable
|
||||||
pool = DB::Pool.new(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
|
pool = create_pool(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
|
||||||
pool.checkout
|
pool.checkout
|
||||||
pool.checkout
|
pool.checkout
|
||||||
pool.checkout
|
pool.checkout
|
||||||
|
@ -191,7 +195,7 @@ describe DB::Pool do
|
||||||
end
|
end
|
||||||
|
|
||||||
it "should not return closed resources to the pool" do
|
it "should not return closed resources to the pool" do
|
||||||
pool = DB::Pool.new(max_pool_size: 1, max_idle_pool_size: 1) { Closable.new }
|
pool = create_pool(max_pool_size: 1, max_idle_pool_size: 1) { Closable.new }
|
||||||
|
|
||||||
# pool size 1 should be reusing the one resource
|
# pool size 1 should be reusing the one resource
|
||||||
resource1 = pool.checkout
|
resource1 = pool.checkout
|
||||||
|
@ -209,7 +213,7 @@ describe DB::Pool do
|
||||||
|
|
||||||
it "should create resource after max_pool was reached if idle forced some close up" do
|
it "should create resource after max_pool was reached if idle forced some close up" do
|
||||||
all = [] of Closable
|
all = [] of Closable
|
||||||
pool = DB::Pool.new(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
|
pool = create_pool(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
|
||||||
pool.checkout
|
pool.checkout
|
||||||
pool.checkout
|
pool.checkout
|
||||||
pool.checkout
|
pool.checkout
|
||||||
|
|
|
@ -53,7 +53,7 @@ module DB
|
||||||
@setup_connection = ->(conn : Connection) {}
|
@setup_connection = ->(conn : Connection) {}
|
||||||
factory = @driver.connection_builder(@uri)
|
factory = @driver.connection_builder(@uri)
|
||||||
@pool = uninitialized Pool(Connection) # in order to use self in the factory proc
|
@pool = uninitialized Pool(Connection) # in order to use self in the factory proc
|
||||||
@pool = Pool.new(**pool_options) {
|
@pool = Pool.new(pool_options) {
|
||||||
conn = factory.call
|
conn = factory.call
|
||||||
conn.auto_release = false
|
conn.auto_release = false
|
||||||
conn.context = self
|
conn.context = self
|
||||||
|
|
|
@ -36,15 +36,8 @@ module DB
|
||||||
Connection::Options.from_http_params(params)
|
Connection::Options.from_http_params(params)
|
||||||
end
|
end
|
||||||
|
|
||||||
def connection_pool_options(params : HTTP::Params)
|
def connection_pool_options(params : HTTP::Params) : Pool::Options
|
||||||
{
|
Pool::Options.from_http_params(params)
|
||||||
initial_pool_size: params.fetch("initial_pool_size", 1).to_i,
|
|
||||||
max_pool_size: params.fetch("max_pool_size", 0).to_i,
|
|
||||||
max_idle_pool_size: params.fetch("max_idle_pool_size", 1).to_i,
|
|
||||||
checkout_timeout: params.fetch("checkout_timeout", 5.0).to_f,
|
|
||||||
retry_attempts: params.fetch("retry_attempts", 1).to_i,
|
|
||||||
retry_delay: params.fetch("retry_delay", 1.0).to_f,
|
|
||||||
}
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -4,6 +4,31 @@ require "./error"
|
||||||
|
|
||||||
module DB
|
module DB
|
||||||
class Pool(T)
|
class Pool(T)
|
||||||
|
record Options,
|
||||||
|
# initial number of connections in the pool
|
||||||
|
initial_pool_size : Int32 = 1,
|
||||||
|
# maximum amount of connections in the pool (Idle + InUse)
|
||||||
|
max_pool_size : Int32 = 0,
|
||||||
|
# maximum amount of idle connections in the pool
|
||||||
|
max_idle_pool_size : Int32 = 1,
|
||||||
|
# seconds to wait before timeout while doing a checkout
|
||||||
|
checkout_timeout : Float64 = 5.0,
|
||||||
|
# maximum amount of retry attempts to reconnect to the db. See `Pool#retry`
|
||||||
|
retry_attempts : Int32 = 1,
|
||||||
|
# seconds to wait before a retry attempt
|
||||||
|
retry_delay : Float64 = 0.2 do
|
||||||
|
def self.from_http_params(params : HTTP::Params, default = Options.new)
|
||||||
|
Options.new(
|
||||||
|
initial_pool_size: params.fetch("initial_pool_size", default.initial_pool_size).to_i,
|
||||||
|
max_pool_size: params.fetch("max_pool_size", default.max_pool_size).to_i,
|
||||||
|
max_idle_pool_size: params.fetch("max_idle_pool_size", default.max_idle_pool_size).to_i,
|
||||||
|
checkout_timeout: params.fetch("checkout_timeout", default.checkout_timeout).to_f,
|
||||||
|
retry_attempts: params.fetch("retry_attempts", default.retry_attempts).to_i,
|
||||||
|
retry_delay: params.fetch("retry_delay", default.retry_delay).to_f,
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
# Pool configuration
|
# Pool configuration
|
||||||
|
|
||||||
# initial number of connections in the pool
|
# initial number of connections in the pool
|
||||||
|
@ -37,8 +62,25 @@ module DB
|
||||||
# global pool mutex
|
# global pool mutex
|
||||||
@mutex : Mutex
|
@mutex : Mutex
|
||||||
|
|
||||||
def initialize(@initial_pool_size = 1, @max_pool_size = 0, @max_idle_pool_size = 1, @checkout_timeout = 5.0,
|
@[Deprecated("Use `#new` with DB::Pool::Options instead")]
|
||||||
@retry_attempts = 1, @retry_delay = 0.2, &@factory : -> T)
|
def initialize(initial_pool_size = 1, max_pool_size = 0, max_idle_pool_size = 1, checkout_timeout = 5.0,
|
||||||
|
retry_attempts = 1, retry_delay = 0.2, &factory : -> T)
|
||||||
|
initialize(
|
||||||
|
Options.new(
|
||||||
|
initial_pool_size: initial_pool_size, max_pool_size: max_pool_size,
|
||||||
|
max_idle_pool_size: max_idle_pool_size, checkout_timeout: checkout_timeout,
|
||||||
|
retry_attempts: retry_attempts, retry_delay: retry_delay),
|
||||||
|
&factory)
|
||||||
|
end
|
||||||
|
|
||||||
|
def initialize(pool_options : Options = Options.new, &@factory : -> T)
|
||||||
|
@initial_pool_size = pool_options.initial_pool_size
|
||||||
|
@max_pool_size = pool_options.max_pool_size
|
||||||
|
@max_idle_pool_size = pool_options.max_idle_pool_size
|
||||||
|
@checkout_timeout = pool_options.checkout_timeout
|
||||||
|
@retry_attempts = pool_options.retry_attempts
|
||||||
|
@retry_delay = pool_options.retry_delay
|
||||||
|
|
||||||
@availability_channel = Channel(Nil).new
|
@availability_channel = Channel(Nil).new
|
||||||
@waiting_resource = 0
|
@waiting_resource = 0
|
||||||
@inflight = 0
|
@inflight = 0
|
||||||
|
|
Loading…
Reference in a new issue