Merge pull request #109 from crystal-lang/pool-lock

Rework pool lock logic to handle new connections context switches
This commit is contained in:
Brian J. Cardiff 2019-09-12 09:59:49 -03:00 committed by GitHub
commit ff5c3263ad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -2,47 +2,97 @@ require "weak_ref"
module DB module DB
class Pool(T) class Pool(T)
# Pool configuration
# initial number of connections in the pool
@initial_pool_size : Int32 @initial_pool_size : Int32
# maximum amount of objects in the pool. Either available or in use. # maximum amount of connections in the pool (Idle + InUse)
@max_pool_size : Int32 @max_pool_size : Int32
@available = Set(T).new # maximum amount of idle connections in the pool
@total = [] of T @max_idle_pool_size : Int32
# seconds to wait before timeout while doing a checkout
@checkout_timeout : Float64 @checkout_timeout : Float64
# maximum amount of retry attempts to reconnect to the db. See `Pool#retry`. # maximum amount of retry attempts to reconnect to the db. See `Pool#retry`
@retry_attempts : Int32 @retry_attempts : Int32
# seconds to wait before a retry attempt
@retry_delay : Float64 @retry_delay : Float64
# Pool state
# total of open connections managed by this pool
@total = [] of T
# connections available for checkout
@idle = Set(T).new
# connections waiting to be stablished (they are not in *@idle* nor in *@total*)
@inflight : Int32
# Sync state
# communicate that a connection is available for checkout
@availability_channel : Channel(Nil)
# signal how many existing connections are waited for
@waiting_resource : Int32
# global pool mutex
@mutex : Mutex
def initialize(@initial_pool_size = 1, @max_pool_size = 0, @max_idle_pool_size = 1, @checkout_timeout = 5.0, def initialize(@initial_pool_size = 1, @max_pool_size = 0, @max_idle_pool_size = 1, @checkout_timeout = 5.0,
@retry_attempts = 1, @retry_delay = 0.2, &@factory : -> T) @retry_attempts = 1, @retry_delay = 0.2, &@factory : -> T)
@initial_pool_size.times { build_resource }
@availability_channel = Channel(Nil).new @availability_channel = Channel(Nil).new
@waiting_resource = 0 @waiting_resource = 0
@inflight = 0
@mutex = Mutex.new @mutex = Mutex.new
@initial_pool_size.times { build_resource }
end end
# close all resources in the pool # close all resources in the pool
def close : Nil def close : Nil
@total.each &.close @total.each &.close
@total.clear @total.clear
@available.clear @idle.clear
end
record Stats, open_connections : Int32
# Returns stats of the pool
def stats
Stats.new(
open_connections: @total.size
)
end end
def checkout : T def checkout : T
resource = if @available.empty? res = sync do
if can_increase_pool resource = nil
build_resource
else
wait_for_available
pick_available
end
else
pick_available
end
@available.delete resource until resource
resource.before_checkout resource = if @idle.empty?
resource if can_increase_pool?
@inflight += 1
r = unsync { build_resource }
@inflight -= 1
r
else
unsync { wait_for_available }
# The wait for available can unlock
# multiple fibers waiting for a resource.
# Although only one will pick it due to the lock
# in the end of the unsync, the pick_available
# will return nil
pick_available
end
else
pick_available
end
end
@idle.delete resource
resource
end
res.before_checkout
res
end end
# ``` # ```
@ -51,14 +101,14 @@ module DB
# `selected` be a resource from the `candidates` list and `is_candidate` == `true` # `selected` be a resource from the `candidates` list and `is_candidate` == `true`
# or `selected` will be a new resource and `is_candidate` == `false` # or `selected` will be a new resource and `is_candidate` == `false`
def checkout_some(candidates : Enumerable(WeakRef(T))) : {T, Bool} def checkout_some(candidates : Enumerable(WeakRef(T))) : {T, Bool}
# TODO honor candidates while waiting for availables sync do
# this will allow us to remove `candidates.includes?(resource)` candidates.each do |ref|
candidates.each do |ref| resource = ref.value
resource = ref.value if resource && is_available?(resource)
if resource && is_available?(resource) @idle.delete resource
@available.delete resource resource.before_checkout
resource.before_checkout return {resource, true}
return {resource, true} end
end end
end end
@ -67,13 +117,21 @@ module DB
end end
def release(resource : T) : Nil def release(resource : T) : Nil
if can_increase_idle_pool idle_pushed = false
@available << resource
resource.after_release sync do
@availability_channel.send nil if are_waiting_for_resource? if can_increase_idle_pool
else @idle << resource
resource.close resource.after_release
@total.delete(resource) idle_pushed = true
else
resource.close
@total.delete(resource)
end
end
if idle_pushed && are_waiting_for_resource?
@availability_channel.send nil
end end
end end
@ -82,10 +140,14 @@ module DB
# It will try to reuse all of the available connection right away, # It will try to reuse all of the available connection right away,
# but if a new connection is needed there is a `retry_delay` seconds delay. # but if a new connection is needed there is a `retry_delay` seconds delay.
def retry def retry
current_available = @available.size current_available = 0
# if the pool hasn't reach the max size, allow 1 attempt
# to make a new connection if needed without sleeping sync do
current_available += 1 if can_increase_pool current_available = @idle.size
# if the pool hasn't reach the max size, allow 1 attempt
# to make a new connection if needed without sleeping
current_available += 1 if can_increase_pool?
end
(current_available + @retry_attempts).times do |i| (current_available + @retry_attempts).times do |i|
begin begin
@ -94,7 +156,7 @@ module DB
rescue e : ConnectionLost rescue e : ConnectionLost
# if the connection is lost close it to release resources # if the connection is lost close it to release resources
# and remove it from the known pool. # and remove it from the known pool.
delete(e.connection) sync { delete(e.connection) }
e.connection.close e.connection.close
rescue e : ConnectionRefused rescue e : ConnectionRefused
# a ConnectionRefused means a new connection # a ConnectionRefused means a new connection
@ -107,44 +169,46 @@ module DB
# :nodoc: # :nodoc:
def each_resource def each_resource
@available.each do |resource| sync do
yield resource @idle.each do |resource|
yield resource
end
end end
end end
# :nodoc: # :nodoc:
def is_available?(resource : T) def is_available?(resource : T)
@available.includes?(resource) @idle.includes?(resource)
end end
# :nodoc: # :nodoc:
def delete(resource : T) def delete(resource : T)
@total.delete(resource) @total.delete(resource)
@available.delete(resource) @idle.delete(resource)
end end
private def build_resource : T private def build_resource : T
resource = @factory.call resource = @factory.call
@total << resource @total << resource
@available << resource @idle << resource
resource resource
end end
private def can_increase_pool private def can_increase_pool?
@max_pool_size == 0 || @total.size < @max_pool_size @max_pool_size == 0 || @total.size + @inflight < @max_pool_size
end end
private def can_increase_idle_pool private def can_increase_idle_pool
@available.size < @max_idle_pool_size @idle.size < @max_idle_pool_size
end end
private def pick_available private def pick_available
@available.first @idle.first?
end end
private def wait_for_available private def wait_for_available
timeout = TimeoutHelper.new(@checkout_timeout.to_f64) timeout = TimeoutHelper.new(@checkout_timeout.to_f64)
inc_waiting_resource sync_inc_waiting_resource
timeout.start timeout.start
@ -153,30 +217,42 @@ module DB
case index case index
when 0 when 0
timeout.cancel timeout.cancel
dec_waiting_resource sync_dec_waiting_resource
when 1 when 1
dec_waiting_resource sync_dec_waiting_resource
raise DB::PoolTimeout.new raise DB::PoolTimeout.new
else else
raise DB::Error.new raise DB::Error.new
end end
end end
private def inc_waiting_resource private def sync_inc_waiting_resource
@mutex.synchronize do sync { @waiting_resource += 1 }
@waiting_resource += 1
end
end end
private def dec_waiting_resource private def sync_dec_waiting_resource
@mutex.synchronize do sync { @waiting_resource -= 1 }
@waiting_resource -= 1
end
end end
private def are_waiting_for_resource? private def are_waiting_for_resource?
@mutex.synchronize do @waiting_resource > 0
@waiting_resource > 0 end
private def sync
@mutex.lock
begin
yield
ensure
@mutex.unlock
end
end
private def unsync
@mutex.unlock
begin
yield
ensure
@mutex.lock
end end
end end