From 9275b22a304ccdafc8a628bc5fe4fbd72d0c4dd5 Mon Sep 17 00:00:00 2001 From: "Brian J. Cardiff" Date: Tue, 20 Aug 2019 16:23:15 -0300 Subject: [PATCH 1/3] Add pool stats --- src/db/pool.cr | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/db/pool.cr b/src/db/pool.cr index 575c512..87aea91 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -28,6 +28,15 @@ module DB @available.clear end + record Stats, open_connections : Int32 + + # Returns stats of the pool + def stats + Stats.new( + open_connections: @total.size + ) + end + def checkout : T resource = if @available.empty? if can_increase_pool From 5c1c1ab910430c0c8ca07f181d12d55210ca8d44 Mon Sep 17 00:00:00 2001 From: "Brian J. Cardiff" Date: Tue, 20 Aug 2019 16:20:58 -0300 Subject: [PATCH 2/3] Handle inflight connections and rework state sync --- src/db/pool.cr | 176 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 117 insertions(+), 59 deletions(-) diff --git a/src/db/pool.cr b/src/db/pool.cr index 87aea91..7046cb0 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -2,30 +2,54 @@ require "weak_ref" module DB class Pool(T) + # Pool configuration + + # initial number of connections in the pool @initial_pool_size : Int32 - # maximum amount of objects in the pool. Either available or in use. + # maximum amount of connections in the pool (Idle + InUse) @max_pool_size : Int32 - @available = Set(T).new - @total = [] of T + # maximum amount of idle connections in the pool + @max_idle_pool_size : Int32 + # seconds to wait before timeout while doing a checkout @checkout_timeout : Float64 - # maximum amount of retry attempts to reconnect to the db. See `Pool#retry`. + # maximum amount of retry attempts to reconnect to the db. See `Pool#retry` @retry_attempts : Int32 + # seconds to wait before a retry attempt @retry_delay : Float64 + # Pool state + + # total of open connections managed by this pool + @total = [] of T + # connections available for checkout + @idle = Set(T).new + # connections waiting to be stablished (they are not in *@idle* nor in *@total*) + @inflight : Int32 + + # Sync state + + # communicate that a connection is available for checkout + @availability_channel : Channel(Nil) + # signal how many existing connections are waited for + @waiting_resource : Int32 + # global pool mutex + @mutex : Mutex + def initialize(@initial_pool_size = 1, @max_pool_size = 0, @max_idle_pool_size = 1, @checkout_timeout = 5.0, @retry_attempts = 1, @retry_delay = 0.2, &@factory : -> T) - @initial_pool_size.times { build_resource } - @availability_channel = Channel(Nil).new @waiting_resource = 0 + @inflight = 0 @mutex = Mutex.new + + @initial_pool_size.times { build_resource } end # close all resources in the pool def close : Nil @total.each &.close @total.clear - @available.clear + @idle.clear end record Stats, open_connections : Int32 @@ -38,20 +62,28 @@ module DB end def checkout : T - resource = if @available.empty? - if can_increase_pool - build_resource + res = sync do + resource = if @idle.empty? + if can_increase_pool? + @inflight += 1 + r = unsync { build_resource } + @inflight -= 1 + r + else + unsync { wait_for_available } + pick_available + end else - wait_for_available pick_available end - else - pick_available - end - @available.delete resource - resource.before_checkout - resource + @idle.delete resource + + resource + end + + res.before_checkout + res end # ``` @@ -60,14 +92,14 @@ module DB # `selected` be a resource from the `candidates` list and `is_candidate` == `true` # or `selected` will be a new resource and `is_candidate` == `false` def checkout_some(candidates : Enumerable(WeakRef(T))) : {T, Bool} - # TODO honor candidates while waiting for availables - # this will allow us to remove `candidates.includes?(resource)` - candidates.each do |ref| - resource = ref.value - if resource && is_available?(resource) - @available.delete resource - resource.before_checkout - return {resource, true} + sync do + candidates.each do |ref| + resource = ref.value + if resource && is_available?(resource) + @idle.delete resource + resource.before_checkout + return {resource, true} + end end end @@ -76,13 +108,21 @@ module DB end def release(resource : T) : Nil - if can_increase_idle_pool - @available << resource - resource.after_release - @availability_channel.send nil if are_waiting_for_resource? - else - resource.close - @total.delete(resource) + idle_pushed = false + + sync do + if can_increase_idle_pool + @idle << resource + resource.after_release + idle_pushed = true + else + resource.close + @total.delete(resource) + end + end + + if idle_pushed && are_waiting_for_resource? + @availability_channel.send nil end end @@ -91,10 +131,14 @@ module DB # It will try to reuse all of the available connection right away, # but if a new connection is needed there is a `retry_delay` seconds delay. def retry - current_available = @available.size - # if the pool hasn't reach the max size, allow 1 attempt - # to make a new connection if needed without sleeping - current_available += 1 if can_increase_pool + current_available = 0 + + sync do + current_available = @idle.size + # if the pool hasn't reach the max size, allow 1 attempt + # to make a new connection if needed without sleeping + current_available += 1 if can_increase_pool? + end (current_available + @retry_attempts).times do |i| begin @@ -103,7 +147,7 @@ module DB rescue e : ConnectionLost # if the connection is lost close it to release resources # and remove it from the known pool. - delete(e.connection) + sync { delete(e.connection) } e.connection.close rescue e : ConnectionRefused # a ConnectionRefused means a new connection @@ -116,44 +160,46 @@ module DB # :nodoc: def each_resource - @available.each do |resource| - yield resource + sync do + @idle.each do |resource| + yield resource + end end end # :nodoc: def is_available?(resource : T) - @available.includes?(resource) + @idle.includes?(resource) end # :nodoc: def delete(resource : T) @total.delete(resource) - @available.delete(resource) + @idle.delete(resource) end private def build_resource : T resource = @factory.call @total << resource - @available << resource + @idle << resource resource end - private def can_increase_pool - @max_pool_size == 0 || @total.size < @max_pool_size + private def can_increase_pool? + @max_pool_size == 0 || @total.size + @inflight < @max_pool_size end private def can_increase_idle_pool - @available.size < @max_idle_pool_size + @idle.size < @max_idle_pool_size end private def pick_available - @available.first + @idle.first end private def wait_for_available timeout = TimeoutHelper.new(@checkout_timeout.to_f64) - inc_waiting_resource + sync_inc_waiting_resource timeout.start @@ -162,30 +208,42 @@ module DB case index when 0 timeout.cancel - dec_waiting_resource + sync_dec_waiting_resource when 1 - dec_waiting_resource + sync_dec_waiting_resource raise DB::PoolTimeout.new else raise DB::Error.new end end - private def inc_waiting_resource - @mutex.synchronize do - @waiting_resource += 1 - end + private def sync_inc_waiting_resource + sync { @waiting_resource += 1 } end - private def dec_waiting_resource - @mutex.synchronize do - @waiting_resource -= 1 - end + private def sync_dec_waiting_resource + sync { @waiting_resource -= 1 } end private def are_waiting_for_resource? - @mutex.synchronize do - @waiting_resource > 0 + @waiting_resource > 0 + end + + private def sync + @mutex.lock + begin + yield + ensure + @mutex.unlock + end + end + + private def unsync + @mutex.unlock + begin + yield + ensure + @mutex.lock end end From afad416417f31878f8378900aa2ad8a702cc8585 Mon Sep 17 00:00:00 2001 From: "Brian J. Cardiff" Date: Mon, 2 Sep 2019 14:14:46 -0300 Subject: [PATCH 3/3] Handle multiple fibers waiting for more resources than available --- src/db/pool.cr | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/db/pool.cr b/src/db/pool.cr index 7046cb0..5acffaa 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -63,19 +63,28 @@ module DB def checkout : T res = sync do - resource = if @idle.empty? - if can_increase_pool? - @inflight += 1 - r = unsync { build_resource } - @inflight -= 1 - r + resource = nil + + until resource + resource = if @idle.empty? + if can_increase_pool? + @inflight += 1 + r = unsync { build_resource } + @inflight -= 1 + r + else + unsync { wait_for_available } + # The wait for available can unlock + # multiple fibers waiting for a resource. + # Although only one will pick it due to the lock + # in the end of the unsync, the pick_available + # will return nil + pick_available + end else - unsync { wait_for_available } pick_available end - else - pick_available - end + end @idle.delete resource @@ -194,7 +203,7 @@ module DB end private def pick_available - @idle.first + @idle.first? end private def wait_for_available