diff --git a/src/db/pool.cr b/src/db/pool.cr index 575c512..5acffaa 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -2,47 +2,97 @@ require "weak_ref" module DB class Pool(T) + # Pool configuration + + # initial number of connections in the pool @initial_pool_size : Int32 - # maximum amount of objects in the pool. Either available or in use. + # maximum amount of connections in the pool (Idle + InUse) @max_pool_size : Int32 - @available = Set(T).new - @total = [] of T + # maximum amount of idle connections in the pool + @max_idle_pool_size : Int32 + # seconds to wait before timeout while doing a checkout @checkout_timeout : Float64 - # maximum amount of retry attempts to reconnect to the db. See `Pool#retry`. + # maximum amount of retry attempts to reconnect to the db. See `Pool#retry` @retry_attempts : Int32 + # seconds to wait before a retry attempt @retry_delay : Float64 + # Pool state + + # total of open connections managed by this pool + @total = [] of T + # connections available for checkout + @idle = Set(T).new + # connections waiting to be stablished (they are not in *@idle* nor in *@total*) + @inflight : Int32 + + # Sync state + + # communicate that a connection is available for checkout + @availability_channel : Channel(Nil) + # signal how many existing connections are waited for + @waiting_resource : Int32 + # global pool mutex + @mutex : Mutex + def initialize(@initial_pool_size = 1, @max_pool_size = 0, @max_idle_pool_size = 1, @checkout_timeout = 5.0, @retry_attempts = 1, @retry_delay = 0.2, &@factory : -> T) - @initial_pool_size.times { build_resource } - @availability_channel = Channel(Nil).new @waiting_resource = 0 + @inflight = 0 @mutex = Mutex.new + + @initial_pool_size.times { build_resource } end # close all resources in the pool def close : Nil @total.each &.close @total.clear - @available.clear + @idle.clear + end + + record Stats, open_connections : Int32 + + # Returns stats of the pool + def stats + Stats.new( + open_connections: @total.size + ) end def checkout : T - resource = if @available.empty? - if can_increase_pool - build_resource - else - wait_for_available - pick_available - end - else - pick_available - end + res = sync do + resource = nil - @available.delete resource - resource.before_checkout - resource + until resource + resource = if @idle.empty? + if can_increase_pool? + @inflight += 1 + r = unsync { build_resource } + @inflight -= 1 + r + else + unsync { wait_for_available } + # The wait for available can unlock + # multiple fibers waiting for a resource. + # Although only one will pick it due to the lock + # in the end of the unsync, the pick_available + # will return nil + pick_available + end + else + pick_available + end + end + + @idle.delete resource + + resource + end + + res.before_checkout + res end # ``` @@ -51,14 +101,14 @@ module DB # `selected` be a resource from the `candidates` list and `is_candidate` == `true` # or `selected` will be a new resource and `is_candidate` == `false` def checkout_some(candidates : Enumerable(WeakRef(T))) : {T, Bool} - # TODO honor candidates while waiting for availables - # this will allow us to remove `candidates.includes?(resource)` - candidates.each do |ref| - resource = ref.value - if resource && is_available?(resource) - @available.delete resource - resource.before_checkout - return {resource, true} + sync do + candidates.each do |ref| + resource = ref.value + if resource && is_available?(resource) + @idle.delete resource + resource.before_checkout + return {resource, true} + end end end @@ -67,13 +117,21 @@ module DB end def release(resource : T) : Nil - if can_increase_idle_pool - @available << resource - resource.after_release - @availability_channel.send nil if are_waiting_for_resource? - else - resource.close - @total.delete(resource) + idle_pushed = false + + sync do + if can_increase_idle_pool + @idle << resource + resource.after_release + idle_pushed = true + else + resource.close + @total.delete(resource) + end + end + + if idle_pushed && are_waiting_for_resource? + @availability_channel.send nil end end @@ -82,10 +140,14 @@ module DB # It will try to reuse all of the available connection right away, # but if a new connection is needed there is a `retry_delay` seconds delay. def retry - current_available = @available.size - # if the pool hasn't reach the max size, allow 1 attempt - # to make a new connection if needed without sleeping - current_available += 1 if can_increase_pool + current_available = 0 + + sync do + current_available = @idle.size + # if the pool hasn't reach the max size, allow 1 attempt + # to make a new connection if needed without sleeping + current_available += 1 if can_increase_pool? + end (current_available + @retry_attempts).times do |i| begin @@ -94,7 +156,7 @@ module DB rescue e : ConnectionLost # if the connection is lost close it to release resources # and remove it from the known pool. - delete(e.connection) + sync { delete(e.connection) } e.connection.close rescue e : ConnectionRefused # a ConnectionRefused means a new connection @@ -107,44 +169,46 @@ module DB # :nodoc: def each_resource - @available.each do |resource| - yield resource + sync do + @idle.each do |resource| + yield resource + end end end # :nodoc: def is_available?(resource : T) - @available.includes?(resource) + @idle.includes?(resource) end # :nodoc: def delete(resource : T) @total.delete(resource) - @available.delete(resource) + @idle.delete(resource) end private def build_resource : T resource = @factory.call @total << resource - @available << resource + @idle << resource resource end - private def can_increase_pool - @max_pool_size == 0 || @total.size < @max_pool_size + private def can_increase_pool? + @max_pool_size == 0 || @total.size + @inflight < @max_pool_size end private def can_increase_idle_pool - @available.size < @max_idle_pool_size + @idle.size < @max_idle_pool_size end private def pick_available - @available.first + @idle.first? end private def wait_for_available timeout = TimeoutHelper.new(@checkout_timeout.to_f64) - inc_waiting_resource + sync_inc_waiting_resource timeout.start @@ -153,30 +217,42 @@ module DB case index when 0 timeout.cancel - dec_waiting_resource + sync_dec_waiting_resource when 1 - dec_waiting_resource + sync_dec_waiting_resource raise DB::PoolTimeout.new else raise DB::Error.new end end - private def inc_waiting_resource - @mutex.synchronize do - @waiting_resource += 1 - end + private def sync_inc_waiting_resource + sync { @waiting_resource += 1 } end - private def dec_waiting_resource - @mutex.synchronize do - @waiting_resource -= 1 - end + private def sync_dec_waiting_resource + sync { @waiting_resource -= 1 } end private def are_waiting_for_resource? - @mutex.synchronize do - @waiting_resource > 0 + @waiting_resource > 0 + end + + private def sync + @mutex.lock + begin + yield + ensure + @mutex.unlock + end + end + + private def unsync + @mutex.unlock + begin + yield + ensure + @mutex.lock end end