diff --git a/spec/http_client_pool_spec.cr b/spec/http_client_pool_spec.cr new file mode 100644 index 0000000..2efca77 --- /dev/null +++ b/spec/http_client_pool_spec.cr @@ -0,0 +1,52 @@ +require "./spec_helper" +require "./support/http" + +describe DB::Pool do + it "distributes evenly the requests" do + mutex = Mutex.new + requests_per_connection = Hash(Socket::Address, Int32).new + + server = HTTP::Server.new do |context| + remote_address = context.request.remote_address.not_nil! + mutex.synchronize do + requests_per_connection[remote_address] ||= 0 + requests_per_connection[remote_address] += 1 + end + sleep context.request.query_params["delay"].to_f + context.response.print "ok" + end + address = server.bind_unused_port "127.0.0.1" + + run_server(server) do + fixed_pool_size = 5 + expected_per_connection = 5 + requests = fixed_pool_size * expected_per_connection + + pool = DB::Pool.new( + initial_pool_size: fixed_pool_size, + max_pool_size: fixed_pool_size, + max_idle_pool_size: fixed_pool_size) { + HTTP::Client.new(URI.parse("http://127.0.0.1:#{address.port}/")) + } + + done = Channel(Nil).new + + requests.times do + spawn do + pool.checkout do |http| + http.get("/?delay=0.1") + end + done.send(nil) + end + end + + spawn do + requests.times { done.receive } + done.close + end + wait_for { done.closed? } + + requests_per_connection.values.should eq([expected_per_connection] * fixed_pool_size) + end + end +end diff --git a/spec/support/fibers.cr b/spec/support/fibers.cr new file mode 100644 index 0000000..50e5b38 --- /dev/null +++ b/spec/support/fibers.cr @@ -0,0 +1,16 @@ +def wait_until_blocked(f : Fiber, timeout = 5.seconds) + now = Time.monotonic + + until f.resumable? + Fiber.yield + raise "fiber failed to block within #{timeout}" if (Time.monotonic - now) > timeout + end +end + +def wait_until_finished(f : Fiber, timeout = 5.seconds) + now = Time.monotonic + until f.dead? + Fiber.yield + raise "fiber failed to finish within #{timeout}" if (Time.monotonic - now) > timeout + end +end diff --git a/spec/support/http.cr b/spec/support/http.cr new file mode 100644 index 0000000..cdc05e1 --- /dev/null +++ b/spec/support/http.cr @@ -0,0 +1,48 @@ +require "http" +require "./fibers" + +def wait_for(timeout = 5.seconds) + now = Time.monotonic + + until yield + Fiber.yield + + if (Time.monotonic - now) > timeout + raise "block failed to evaluate to true within #{timeout}" + end + end +end + +# Helper method which runs *server* +# 1. Spawns `server.listen` in a new fiber. +# 2. Waits until `server.listening?`. +# 3. Yields to the given block. +# 4. Ensures the server is closed. +# 5. After returning from the block, it waits for the server to gracefully +# shut down before continuing execution in the current fiber. +# 6. If the listening fiber raises an exception, it is rescued and re-raised +# in the current fiber. +def run_server(server) + server_done = Channel(Exception?).new + + f = spawn do + server.listen + rescue exc + server_done.send exc + else + server_done.send nil + end + + begin + wait_for { server.listening? } + wait_until_blocked f + + yield server_done + ensure + server.close unless server.closed? + + if exc = server_done.receive + raise exc + end + end +end diff --git a/src/db/error.cr b/src/db/error.cr index a304310..5923479 100644 --- a/src/db/error.cr +++ b/src/db/error.cr @@ -1,4 +1,7 @@ module DB + abstract class Connection + end + class Error < Exception end @@ -11,20 +14,29 @@ module DB class PoolRetryAttemptsExceeded < Error end + class PoolResourceLost(T) < Error + getter resource : T + + def initialize(@resource : T) + end + end + + class PoolResourceRefused < Error + end + # Raised when an established connection is lost # probably due to socket/network issues. # It is used by the connection pool retry logic. - class ConnectionLost < Error - getter connection : Connection - - def initialize(@connection) + class ConnectionLost < PoolResourceLost(Connection) + def connection + resource end end # Raised when a connection is unable to be established # probably due to socket/network or configuration issues. # It is used by the connection pool retry logic. - class ConnectionRefused < Error + class ConnectionRefused < PoolResourceRefused end class Rollback < Error diff --git a/src/db/pool.cr b/src/db/pool.cr index 75a8f93..bb885e3 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -1,5 +1,7 @@ require "weak_ref" +require "./error" + module DB class Pool(T) # Pool configuration @@ -52,12 +54,19 @@ module DB @idle.clear end - record Stats, open_connections : Int32 + record Stats, + open_connections : Int32, + idle_connections : Int32, + in_flight_connections : Int32, + max_connections : Int32 # Returns stats of the pool def stats Stats.new( - open_connections: @total.size + open_connections: @total.size, + idle_connections: @idle.size, + in_flight_connections: @inflight, + max_connections: @max_pool_size, ) end @@ -91,10 +100,22 @@ module DB resource end - res.before_checkout + if res.responds_to?(:before_checkout) + res.before_checkout + end res end + def checkout(&block : T ->) + connection = checkout + + begin + yield connection + ensure + release connection + end + end + # ``` # selected, is_candidate = pool.checkout_some(candidates) # ``` @@ -122,7 +143,9 @@ module DB sync do if can_increase_idle_pool @idle << resource - resource.after_release + if resource.responds_to?(:after_release) + resource.after_release + end idle_pushed = true else resource.close @@ -153,12 +176,12 @@ module DB begin sleep @retry_delay if i >= current_available return yield - rescue e : ConnectionLost + rescue e : PoolResourceLost(T) # if the connection is lost close it to release resources # and remove it from the known pool. - sync { delete(e.connection) } - e.connection.close - rescue e : ConnectionRefused + sync { delete(e.resource) } + e.resource.close + rescue e : PoolResourceRefused # a ConnectionRefused means a new connection # was intended to be created # nothing to due but to retry soon @@ -215,7 +238,7 @@ module DB sync_dec_waiting_resource when timeout(@checkout_timeout.seconds) sync_dec_waiting_resource - raise DB::PoolTimeout.new + raise DB::PoolTimeout.new("Could not check out a connection in #{@checkout_timeout} seconds") end end {% else %} @@ -232,7 +255,7 @@ module DB sync_dec_waiting_resource when 1 sync_dec_waiting_resource - raise DB::PoolTimeout.new + raise DB::PoolTimeout.new("Could not check out a connection in #{@checkout_timeout} seconds") else raise DB::Error.new end