From 9471b33ffe6620f741a881affc3db41b34febb78 Mon Sep 17 00:00:00 2001 From: "Brian J. Cardiff" Date: Mon, 31 Jul 2023 11:04:18 -0300 Subject: [PATCH] Fix max_idle_pool_size race condition (#186) * Add Fiber.yield to dummy driver to mimic real drivers IO * Add manual load test file * Fix race condition * Drop unused code * Less state, less bugs * Update spec/manual/load_test.cr Co-authored-by: Beta Ziliani --------- Co-authored-by: Beta Ziliani --- spec/dummy_driver.cr | 4 +++ spec/manual/load_test.cr | 54 ++++++++++++++++++++++++++++++++++++++++ src/db/pool.cr | 26 ++++--------------- 3 files changed, 63 insertions(+), 21 deletions(-) create mode 100644 spec/manual/load_test.cr diff --git a/spec/dummy_driver.cr b/spec/dummy_driver.cr index 85f947f..9fe64dc 100644 --- a/spec/dummy_driver.cr +++ b/spec/dummy_driver.cr @@ -19,6 +19,7 @@ class DummyDriver < DB::Driver class DummyConnection < DB::Connection def initialize(options : DB::Connection::Options) super(options) + Fiber.yield @connected = true @@connections ||= [] of DummyConnection @@connections.not_nil! << self @@ -113,6 +114,7 @@ class DummyDriver < DB::Driver end protected def perform_query(args : Enumerable) : DB::ResultSet + Fiber.yield @connection.as(DummyConnection).check set_params args DummyResultSet.new self, command @@ -161,6 +163,8 @@ class DummyDriver < DB::Driver def initialize(statement, command) super(statement) + Fiber.yield + @top_values = command.split.map { |r| r.split(',') }.to_a @column_count = @top_values.size > 0 ? @top_values[0].size : 2 diff --git a/spec/manual/load_test.cr b/spec/manual/load_test.cr new file mode 100644 index 0000000..9d1e279 --- /dev/null +++ b/spec/manual/load_test.cr @@ -0,0 +1,54 @@ +# This file is to be executed as: +# +# % crystal ./spec/manual/load_test.cr +# +# It generates a number of producers and consumers. If the process hangs +# it means that the connection pool is not working properly. Likely a race condition. +# + +require "../dummy_driver" +require "../../src/db" +require "json" + +CONNECTION = "dummy://host?initial_pool_size=5&max_pool_size=5&max_idle_pool_size=5" + +alias TChannel = Channel(Int32) +alias TDone = Channel(Bool) + +COUNT = 200 + +def start_consumer(channel : TChannel, done : TDone) + spawn do + indeces = Set(Int32).new + loop do + indeces << channel.receive + puts "Received size=#{indeces.size}" + break if indeces.size == COUNT + end + done.send true + end +end + +def start_producers(channel : TChannel) + db = DB.open CONNECTION do |db| + sql = "1,title,description,2023 " * 100_000 + + COUNT.times do |index| + spawn(name: "prod #{index}") do + puts "Sending #{index}" + _films = db.query_all(sql, as: {Int32, String, String, Int32}) + rescue ex + puts "Error: #{ex.message}" + ensure + channel.send index + end + end + end +end + +channel = TChannel.new +done = TDone.new +start_consumer(channel, done) +start_producers(channel) + +done.receive diff --git a/src/db/pool.cr b/src/db/pool.cr index 9cc1de5..757e4e3 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -57,8 +57,6 @@ module DB # communicate that a connection is available for checkout @availability_channel : Channel(Nil) - # signal how many existing connections are waited for - @waiting_resource : Int32 # global pool mutex @mutex : Mutex @@ -82,7 +80,6 @@ module DB @retry_delay = pool_options.retry_delay @availability_channel = Channel(Nil).new - @waiting_resource = 0 @inflight = 0 @mutex = Mutex.new @@ -200,8 +197,11 @@ module DB end end - if idle_pushed && are_waiting_for_resource? - @availability_channel.send nil + if idle_pushed + select + when @availability_channel.send(nil) + else + end end end @@ -281,29 +281,13 @@ module DB end private def wait_for_available - sync_inc_waiting_resource - select when @availability_channel.receive - sync_dec_waiting_resource when timeout(@checkout_timeout.seconds) - sync_dec_waiting_resource raise DB::PoolTimeout.new("Could not check out a connection in #{@checkout_timeout} seconds") end end - private def sync_inc_waiting_resource - sync { @waiting_resource += 1 } - end - - private def sync_dec_waiting_resource - sync { @waiting_resource -= 1 } - end - - private def are_waiting_for_resource? - @waiting_resource > 0 - end - private def sync @mutex.lock begin