mirror of
https://gitea.invidious.io/iv-org/shard-crystal-db.git
synced 2024-08-15 00:53:32 +00:00
Fix max_idle_pool_size race condition (#186)
* Add Fiber.yield to dummy driver to mimic real drivers IO * Add manual load test file * Fix race condition * Drop unused code * Less state, less bugs * Update spec/manual/load_test.cr Co-authored-by: Beta Ziliani <beta@manas.tech> --------- Co-authored-by: Beta Ziliani <beta@manas.tech>
This commit is contained in:
parent
ce95cd2257
commit
9471b33ffe
3 changed files with 63 additions and 21 deletions
|
@ -19,6 +19,7 @@ class DummyDriver < DB::Driver
|
|||
class DummyConnection < DB::Connection
|
||||
def initialize(options : DB::Connection::Options)
|
||||
super(options)
|
||||
Fiber.yield
|
||||
@connected = true
|
||||
@@connections ||= [] of DummyConnection
|
||||
@@connections.not_nil! << self
|
||||
|
@ -113,6 +114,7 @@ class DummyDriver < DB::Driver
|
|||
end
|
||||
|
||||
protected def perform_query(args : Enumerable) : DB::ResultSet
|
||||
Fiber.yield
|
||||
@connection.as(DummyConnection).check
|
||||
set_params args
|
||||
DummyResultSet.new self, command
|
||||
|
@ -161,6 +163,8 @@ class DummyDriver < DB::Driver
|
|||
|
||||
def initialize(statement, command)
|
||||
super(statement)
|
||||
Fiber.yield
|
||||
|
||||
@top_values = command.split.map { |r| r.split(',') }.to_a
|
||||
@column_count = @top_values.size > 0 ? @top_values[0].size : 2
|
||||
|
||||
|
|
54
spec/manual/load_test.cr
Normal file
54
spec/manual/load_test.cr
Normal file
|
@ -0,0 +1,54 @@
|
|||
# This file is to be executed as:
|
||||
#
|
||||
# % crystal ./spec/manual/load_test.cr
|
||||
#
|
||||
# It generates a number of producers and consumers. If the process hangs
|
||||
# it means that the connection pool is not working properly. Likely a race condition.
|
||||
#
|
||||
|
||||
require "../dummy_driver"
|
||||
require "../../src/db"
|
||||
require "json"
|
||||
|
||||
CONNECTION = "dummy://host?initial_pool_size=5&max_pool_size=5&max_idle_pool_size=5"
|
||||
|
||||
alias TChannel = Channel(Int32)
|
||||
alias TDone = Channel(Bool)
|
||||
|
||||
COUNT = 200
|
||||
|
||||
def start_consumer(channel : TChannel, done : TDone)
|
||||
spawn do
|
||||
indeces = Set(Int32).new
|
||||
loop do
|
||||
indeces << channel.receive
|
||||
puts "Received size=#{indeces.size}"
|
||||
break if indeces.size == COUNT
|
||||
end
|
||||
done.send true
|
||||
end
|
||||
end
|
||||
|
||||
def start_producers(channel : TChannel)
|
||||
db = DB.open CONNECTION do |db|
|
||||
sql = "1,title,description,2023 " * 100_000
|
||||
|
||||
COUNT.times do |index|
|
||||
spawn(name: "prod #{index}") do
|
||||
puts "Sending #{index}"
|
||||
_films = db.query_all(sql, as: {Int32, String, String, Int32})
|
||||
rescue ex
|
||||
puts "Error: #{ex.message}"
|
||||
ensure
|
||||
channel.send index
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
channel = TChannel.new
|
||||
done = TDone.new
|
||||
start_consumer(channel, done)
|
||||
start_producers(channel)
|
||||
|
||||
done.receive
|
|
@ -57,8 +57,6 @@ module DB
|
|||
|
||||
# communicate that a connection is available for checkout
|
||||
@availability_channel : Channel(Nil)
|
||||
# signal how many existing connections are waited for
|
||||
@waiting_resource : Int32
|
||||
# global pool mutex
|
||||
@mutex : Mutex
|
||||
|
||||
|
@ -82,7 +80,6 @@ module DB
|
|||
@retry_delay = pool_options.retry_delay
|
||||
|
||||
@availability_channel = Channel(Nil).new
|
||||
@waiting_resource = 0
|
||||
@inflight = 0
|
||||
@mutex = Mutex.new
|
||||
|
||||
|
@ -200,8 +197,11 @@ module DB
|
|||
end
|
||||
end
|
||||
|
||||
if idle_pushed && are_waiting_for_resource?
|
||||
@availability_channel.send nil
|
||||
if idle_pushed
|
||||
select
|
||||
when @availability_channel.send(nil)
|
||||
else
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -281,29 +281,13 @@ module DB
|
|||
end
|
||||
|
||||
private def wait_for_available
|
||||
sync_inc_waiting_resource
|
||||
|
||||
select
|
||||
when @availability_channel.receive
|
||||
sync_dec_waiting_resource
|
||||
when timeout(@checkout_timeout.seconds)
|
||||
sync_dec_waiting_resource
|
||||
raise DB::PoolTimeout.new("Could not check out a connection in #{@checkout_timeout} seconds")
|
||||
end
|
||||
end
|
||||
|
||||
private def sync_inc_waiting_resource
|
||||
sync { @waiting_resource += 1 }
|
||||
end
|
||||
|
||||
private def sync_dec_waiting_resource
|
||||
sync { @waiting_resource -= 1 }
|
||||
end
|
||||
|
||||
private def are_waiting_for_resource?
|
||||
@waiting_resource > 0
|
||||
end
|
||||
|
||||
private def sync
|
||||
@mutex.lock
|
||||
begin
|
||||
|
|
Loading…
Reference in a new issue