diff --git a/spec/database_spec.cr b/spec/database_spec.cr index a9ccef8..38f963b 100644 --- a/spec/database_spec.cr +++ b/spec/database_spec.cr @@ -57,14 +57,6 @@ describe DB::Database do end end - it "should close pool statements when closing db" do - stmt = uninitialized DB::PoolStatement - with_dummy do |db| - stmt = db.build("query1") - end - stmt.closed?.should be_true - end - it "should not reconnect if connection is lost and retry_attempts=0" do DummyDriver::DummyConnection.clear_connections DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=0" do |db| @@ -187,6 +179,25 @@ describe DB::Database do end end + it "should not checkout multiple connections if there is a statement error" do + with_dummy "dummy://localhost:1027?initial_pool_size=1&max_pool_size=10&retry_attempts=10" do |db| + expect_raises DB::Error do + db.exec("syntax error") + end + DummyDriver::DummyConnection.connections.size.should eq(1) + end + end + + it "should attempt all retries if connection is lost" do + with_dummy "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=10" do |db| + expect_raises DB::PoolRetryAttemptsExceeded do + db.exec("raise ConnectionLost") + end + # 1 initial + 10 retries + DummyDriver::DummyConnection.connections.size.should eq(11) + end + end + describe "prepared_statements connection option" do it "defaults to true" do with_dummy "dummy://localhost:1027" do |db| @@ -239,24 +250,6 @@ describe DB::Database do end end - describe "prepared_statements_cache connection option" do - it "should reuse prepared statements if true" do - with_dummy "dummy://localhost:1027?prepared_statements=true&prepared_statements_cache=true" do |db| - stmt1 = db.build("the query") - stmt2 = db.build("the query") - stmt1.object_id.should eq(stmt2.object_id) - end - end - - it "should not reuse prepared statements if false" do - with_dummy "dummy://localhost:1027?prepared_statements=true&prepared_statements_cache=false" do |db| - stmt1 = db.build("the query") - stmt2 = db.build("the query") - stmt1.object_id.should_not eq(stmt2.object_id) - end - end - end - describe "unprepared statements in pool" do it "creating statements should not create new connections" do with_dummy "dummy://localhost:1027?initial_pool_size=1" do |db| diff --git a/spec/dummy_driver.cr b/spec/dummy_driver.cr index da6417c..84ab5b7 100644 --- a/spec/dummy_driver.cr +++ b/spec/dummy_driver.cr @@ -1,4 +1,3 @@ -require "spec" require "../src/db" class DummyDriver < DB::Driver @@ -17,20 +16,37 @@ class DummyDriver < DB::Driver end class DummyConnection < DB::Connection + @@connections = [] of DummyConnection + @@connections_count = Atomic(Int32).new(0) + def initialize(options : DB::Connection::Options) super(options) Fiber.yield + @@connections_count.add(1) @connected = true - @@connections ||= [] of DummyConnection - @@connections.not_nil! << self + {% unless flag?(:preview_mt) %} + # @@connections is only used in single-threaded mode in specs + # for benchmarks we want to avoid the overhead of synchronizing this array + @@connections << self + {% end %} + end + + def self.connections_count + @@connections_count.get end def self.connections - @@connections.not_nil! + {% if flag?(:preview_mt) %} + raise "DummyConnection.connections is only available in single-threaded mode" + {% end %} + @@connections end def self.clear_connections - @@connections.try &.clear + {% if flag?(:preview_mt) %} + raise "DummyConnection.clear_connections is only available in single-threaded mode" + {% end %} + @@connections.clear end def build_prepared_statement(query) : DB::Statement @@ -117,17 +133,31 @@ class DummyDriver < DB::Driver end class DummyStatement < DB::Statement + @@statements_count = Atomic(Int32).new(0) + @@statements_exec_count = Atomic(Int32).new(0) property params def initialize(connection, command : String, @prepared : Bool) @params = Hash(Int32 | String, DB::Any | Array(DB::Any)).new super(connection, command) + @@statements_count.add(1) raise DB::Error.new(command) if command == "syntax error" + raise DB::ConnectionLost.new(connection) if command == "raise ConnectionLost" + end + + def self.statements_count + @@statements_count.get + end + + def self.statements_exec_count + @@statements_exec_count.get end protected def perform_query(args : Enumerable) : DB::ResultSet assert_not_closed! + @@statements_exec_count.add(1) + Fiber.yield @connection.as(DummyConnection).check set_params args @@ -137,6 +167,8 @@ class DummyDriver < DB::Driver protected def perform_exec(args : Enumerable) : DB::ExecResult assert_not_closed! + @@statements_exec_count.add(1) + @connection.as(DummyConnection).check set_params args raise DB::Error.new("forced exception due to query") if command == "raise" diff --git a/spec/manual/pool_concurrency_test.cr b/spec/manual/pool_concurrency_test.cr new file mode 100644 index 0000000..a36f96b --- /dev/null +++ b/spec/manual/pool_concurrency_test.cr @@ -0,0 +1,68 @@ +# This file is to be executed as: +# +# % crystal run --release [-Dpreview_mt] ./spec/manual/pool_concurrency_test.cr -- --options="max_pool_size=5" --duration=30 --concurrency=4 +# +# + +require "option_parser" +require "../dummy_driver" +require "../../src/db" + +options = "" +duration = 3 +concurrency = 4 + +OptionParser.parse do |parser| + parser.banner = "Usage: pool_concurrency_test [arguments]" + parser.on("-o", "--options=VALUE", "Connection string options") { |v| options = v } + parser.on("-d", "--duration=SECONDS", "Specifies the duration in seconds") { |v| duration = v.to_i } + parser.on("-c", "--concurrency=VALUE", "Specifies the concurrent requests to perform") { |v| concurrency = v.to_i } + parser.on("-h", "--help", "Show this help") do + puts parser + exit + end + parser.invalid_option do |flag| + STDERR.puts "ERROR: #{flag} is not a valid option." + STDERR.puts parser + exit(1) + end +end + +multi_threaded = {% if flag?(:preview_mt) %} ENV["CRYSTAL_WORKERS"]?.try(&.to_i?) || 4 {% else %} false {% end %} +release = {% if flag?(:release) %} true {% else %} false {% end %} + +if !release + puts "WARNING: This should be run in release mode." +end + +db = DB.open "dummy://host?#{options}" + +start_time = Time.monotonic + +puts "Starting test for #{duration} seconds..." + +concurrency.times do + spawn do + loop do + db.scalar "1" + Fiber.yield + end + end +end + +sleep duration.seconds + +end_time = Time.monotonic + +puts " Options : #{options}" +puts " Duration (sec) : #{duration} (actual #{end_time - start_time})" +puts " Concurrency : #{concurrency}" +puts " Multi Threaded : #{multi_threaded ? "Yes (#{multi_threaded})" : "No"}" +puts "Total Connections : #{DummyDriver::DummyConnection.connections_count}" +puts " Total Statements : #{DummyDriver::DummyStatement.statements_count}" +puts " Total Queries : #{DummyDriver::DummyStatement.statements_exec_count}" +puts " Throughput (q/s) : #{DummyDriver::DummyStatement.statements_exec_count / duration}" + +if !release + puts "WARNING: This should be run in release mode." +end diff --git a/src/db/database.cr b/src/db/database.cr index 79741d5..16e1e7b 100644 --- a/src/db/database.cr +++ b/src/db/database.cr @@ -38,7 +38,6 @@ module DB @connection_options : Connection::Options @pool : Pool(Connection) @setup_connection : Connection -> Nil - @statements_cache = StringKeyCache(PoolPreparedStatement).new # Initialize a database with the specified options and connection factory. # This covers more advanced use cases that might not be supported by an URI connection string such as tunneling connection. @@ -81,9 +80,6 @@ module DB # Closes all connection to the database. def close - @statements_cache.each_value &.close - @statements_cache.clear - @pool.close end @@ -99,15 +95,6 @@ module DB # :nodoc: def fetch_or_build_prepared_statement(query) : PoolStatement - if @connection_options.prepared_statements_cache - @statements_cache.fetch(query) { build_prepared_statement(query) } - else - build_prepared_statement(query) - end - end - - # :nodoc: - def build_prepared_statement(query) : PoolStatement PoolPreparedStatement.new(self, query) end diff --git a/src/db/pool.cr b/src/db/pool.cr index 757e4e3..378e303 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -158,27 +158,6 @@ module DB end end - # ``` - # selected, is_candidate = pool.checkout_some(candidates) - # ``` - # `selected` be a resource from the `candidates` list and `is_candidate` == `true` - # or `selected` will be a new resource and `is_candidate` == `false` - def checkout_some(candidates : Enumerable(WeakRef(T))) : {T, Bool} - sync do - candidates.each do |ref| - resource = ref.value - if resource && is_available?(resource) - @idle.delete resource - resource.before_checkout - return {resource, true} - end - end - end - - resource = checkout - {resource, candidates.any? { |ref| ref.value == resource }} - end - def release(resource : T) : Nil idle_pushed = false @@ -227,8 +206,6 @@ module DB # if the connection is lost it will be closed by # the exception to release resources # we still need to remove it from the known pool. - # Closed connection will be evicted from statement cache - # in PoolPreparedStatement#clean_connections sync { delete(e.resource) } rescue e : PoolResourceRefused # a ConnectionRefused means a new connection diff --git a/src/db/pool_prepared_statement.cr b/src/db/pool_prepared_statement.cr index 388b3b2..dea14ca 100644 --- a/src/db/pool_prepared_statement.cr +++ b/src/db/pool_prepared_statement.cr @@ -4,75 +4,20 @@ module DB # The execution of the statement is retried according to the pool configuration. # # See `PoolStatement` - class PoolPreparedStatement < PoolStatement - # connections where the statement was prepared - @connections = Set(WeakRef(Connection)).new - @mutex = Mutex.new - + struct PoolPreparedStatement < PoolStatement def initialize(db : Database, query : String) super - # Prepares a statement on some connection - # otherwise the preparation is delayed until the first execution. - # After the first initialization the connection must be released - # it will be checked out when executing it. - - # This only happens if the db is configured to use prepared statements cache. - # Without that there is no reference to the already prepared statement we can - # take advantage of. - if db.prepared_statements_cache? - statement_with_retry &.release_connection - end - - # TODO use a round-robin selection in the pool so multiple sequentially - # initialized statements are assigned to different connections. - end - - protected def do_close - @mutex.synchronize do - # TODO close all statements on all connections. - # currently statements are closed when the connection is closed. - - # WHAT-IF the connection is busy? Should each statement be able to - # deallocate itself when the connection is free. - @connections.clear - end end # builds a statement over a real connection - # the connection is registered in `@connections` private def build_statement : Statement - clean_connections - - conn, existing = @mutex.synchronize do - @db.checkout_some(@connections) - end - + conn = @db.pool.checkout begin - stmt = conn.prepared.build(@query) + conn.prepared.build(@query) rescue ex conn.release raise ex end - if !existing && @db.prepared_statements_cache? - @mutex.synchronize do - @connections << WeakRef.new(conn) - end - end - stmt - end - - private def clean_connections - return unless @db.prepared_statements_cache? - - @mutex.synchronize do - # remove disposed or closed connections - @connections.each do |ref| - conn = ref.value - if !conn || conn.closed? - @connections.delete ref - end - end - end end end end diff --git a/src/db/pool_statement.cr b/src/db/pool_statement.cr index f0fc2b2..20a8214 100644 --- a/src/db/pool_statement.cr +++ b/src/db/pool_statement.cr @@ -3,7 +3,7 @@ module DB # a statement from the DB needs to be able to represent a statement in any # of the connections of the pool. Otherwise the user will need to deal with # actual connections in some point. - abstract class PoolStatement + abstract struct PoolStatement include StatementMethods def initialize(@db : Database, @query : String) diff --git a/src/db/pool_unprepared_statement.cr b/src/db/pool_unprepared_statement.cr index c58fafd..fcf56d8 100644 --- a/src/db/pool_unprepared_statement.cr +++ b/src/db/pool_unprepared_statement.cr @@ -4,15 +4,11 @@ module DB # The execution of the statement is retried according to the pool configuration. # # See `PoolStatement` - class PoolUnpreparedStatement < PoolStatement + struct PoolUnpreparedStatement < PoolStatement def initialize(db : Database, query : String) super end - protected def do_close - # unprepared statements do not need to be release in each connection - end - # builds a statement over a real connection private def build_statement : Statement conn = @db.pool.checkout diff --git a/src/db/statement.cr b/src/db/statement.cr index 6602e6d..b3b32f6 100644 --- a/src/db/statement.cr +++ b/src/db/statement.cr @@ -2,11 +2,6 @@ module DB # Common interface for connection based statements # and for connection pool statements. module StatementMethods - include Disposable - - protected def do_close - end - # See `QueryMethods#scalar` def scalar(*args_, args : Array? = nil) query(*args_, args: args) do |rs| @@ -47,6 +42,10 @@ module DB # 6. `#do_close` is called to release the statement resources. abstract class Statement include StatementMethods + include Disposable + + protected def do_close + end # :nodoc: getter connection diff --git a/src/db/string_key_cache.cr b/src/db/string_key_cache.cr index ef77d67..f2cae62 100644 --- a/src/db/string_key_cache.cr +++ b/src/db/string_key_cache.cr @@ -1,28 +1,21 @@ module DB class StringKeyCache(T) @cache = {} of String => T - @mutex = Mutex.new def fetch(key : String) : T - @mutex.synchronize do - value = @cache.fetch(key, nil) - value = @cache[key] = yield unless value - value - end + value = @cache.fetch(key, nil) + value = @cache[key] = yield unless value + value end def each_value - @mutex.synchronize do - @cache.each do |_, value| - yield value - end + @cache.each do |_, value| + yield value end end def clear - @mutex.synchronize do - @cache.clear - end + @cache.clear end end end