diff --git a/spec/database_spec.cr b/spec/database_spec.cr new file mode 100644 index 0000000..46483e3 --- /dev/null +++ b/spec/database_spec.cr @@ -0,0 +1,90 @@ +require "./spec_helper" + +describe DB::Database do + it "allows connection initialization" do + cnn_setup = 0 + DB.open "dummy://localhost:1027?initial_pool_size=2&max_pool_size=4&max_idle_pool_size=1" do |db| + cnn_setup.should eq(0) + + db.setup_connection do |cnn| + cnn_setup += 1 + end + + cnn_setup.should eq(2) + + db.using_connection do + cnn_setup.should eq(2) + db.using_connection do + cnn_setup.should eq(2) + db.using_connection do + cnn_setup.should eq(3) + db.using_connection do + cnn_setup.should eq(4) + end + # the pool didn't shrink no new initialization should be done next + db.using_connection do + cnn_setup.should eq(4) + end + end + # the pool shrink 1. max_idle_pool_size=1 + # after the previous end there where 2. + db.using_connection do + cnn_setup.should eq(4) + # so now there will be a new connection created + db.using_connection do + cnn_setup.should eq(5) + end + end + end + end + end + end + + it "should allow creation of more statements than pool connections" do + DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=2" do |db| + db.prepare("query1").should be_a(DB::PoolStatement) + db.prepare("query2").should be_a(DB::PoolStatement) + db.prepare("query3").should be_a(DB::PoolStatement) + end + end + + it "should return same statement in pool per query" do + with_dummy do |db| + stmt = db.prepare("query1") + db.prepare("query2").should_not eq(stmt) + db.prepare("query1").should eq(stmt) + end + end + + it "should close pool statements when closing db" do + stmt = uninitialized DB::PoolStatement + with_dummy do |db| + stmt = db.prepare("query1") + end + stmt.closed?.should be_true + end + + it "should not reconnect if connection is lost and retry_attempts=0" do + DummyDriver::DummyConnection.clear_connections + DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=0" do |db| + db.exec("stmt1") + DummyDriver::DummyConnection.connections.size.should eq(1) + DummyDriver::DummyConnection.connections.first.disconnect! + expect_raises DB::PoolRetryAttemptsExceeded do + db.exec("stmt1") + end + DummyDriver::DummyConnection.connections.size.should eq(1) + end + end + + it "should reconnect if connection is lost and executing same statement" do + DummyDriver::DummyConnection.clear_connections + DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=1" do |db| + db.exec("stmt1") + DummyDriver::DummyConnection.connections.size.should eq(1) + DummyDriver::DummyConnection.connections.first.disconnect! + db.exec("stmt1") + DummyDriver::DummyConnection.connections.size.should eq(2) + end + end +end diff --git a/spec/db_spec.cr b/spec/db_spec.cr index 0fb8f4c..5094f4e 100644 --- a/spec/db_spec.cr +++ b/spec/db_spec.cr @@ -61,7 +61,7 @@ describe DB do it "should raise if the sole connection is been used" do with_dummy do |db| db.query "1" do |rs| - expect_raises Exception, /DB Pool Exhausted/ do + expect_raises DB::PoolTimeout do db.scalar "2" end end diff --git a/spec/dummy_driver.cr b/spec/dummy_driver.cr index ceaf38e..0b4a239 100644 --- a/spec/dummy_driver.cr +++ b/spec/dummy_driver.cr @@ -9,6 +9,7 @@ class DummyDriver < DB::Driver class DummyConnection < DB::Connection def initialize(db) super(db) + @connected = true @@connections ||= [] of DummyConnection @@connections.not_nil! << self end @@ -29,6 +30,14 @@ class DummyDriver < DB::Driver 0 end + def check + raise DB::ConnectionLost.new(self) unless @connected + end + + def disconnect! + @connected = false + end + protected def do_close super end @@ -43,11 +52,13 @@ class DummyDriver < DB::Driver end protected def perform_query(args : Enumerable) + @connection.as(DummyConnection).check set_params args DummyResultSet.new self, @query end protected def perform_exec(args : Enumerable) + @connection.as(DummyConnection).check set_params args raise "forced exception due to query" if @query == "raise" DB::ExecResult.new 0i64, 0_i64 @@ -188,7 +199,15 @@ end def with_dummy DummyDriver::DummyConnection.clear_connections - DB.open "dummy://host" do |db| + DB.open "dummy://host?checkout_timeout=0.5" do |db| yield db end end + +def with_dummy_connection + with_dummy do |db| + db.using_connection do |cnn| + yield cnn.as(DummyDriver::DummyConnection) + end + end +end diff --git a/spec/pool_spec.cr b/spec/pool_spec.cr new file mode 100644 index 0000000..1f59aaa --- /dev/null +++ b/spec/pool_spec.cr @@ -0,0 +1,192 @@ +require "./spec_helper" + +class ShouldSleepingOp + @is_sleeping = false + getter is_sleeping + getter sleep_happened + + def initialize + @sleep_happened = Channel(Nil).new + end + + def should_sleep + s = self + @is_sleeping = true + spawn do + sleep 0.1 + s.is_sleeping.should be_true + s.sleep_happened.send(nil) + end + yield + @is_sleeping = false + end + + def wait_for_sleep + @sleep_happened.receive + end +end + +class WaitFor + def initialize + @channel = Channel(Nil).new + end + + def wait + @channel.receive + end + + def check + @channel.send(nil) + end +end + +class Closable + include DB::Disposable + + protected def do_close + end +end + +describe DB::Pool do + it "should use proc to create objects" do + block_called = 0 + pool = DB::Pool.new(initial_pool_size: 3) { block_called += 1; Closable.new } + block_called.should eq(3) + end + + it "should get resource" do + pool = DB::Pool.new { Closable.new } + pool.checkout.should be_a Closable + end + + it "should be available if not checkedout" do + resource = uninitialized Closable + pool = DB::Pool.new(initial_pool_size: 1) { resource = Closable.new } + pool.is_available?(resource).should be_true + end + + it "should not be available if checkedout" do + pool = DB::Pool.new { Closable.new } + resource = pool.checkout + pool.is_available?(resource).should be_false + end + + it "should be available if returned" do + pool = DB::Pool.new { Closable.new } + resource = pool.checkout + pool.release resource + pool.is_available?(resource).should be_true + end + + it "should wait for available resource" do + pool = DB::Pool.new(max_pool_size: 1, initial_pool_size: 1) { Closable.new } + + b_cnn_request = ShouldSleepingOp.new + wait_a = WaitFor.new + wait_b = WaitFor.new + + spawn do + a_cnn = pool.checkout + b_cnn_request.wait_for_sleep + pool.release a_cnn + + wait_a.check + end + + spawn do + b_cnn_request.should_sleep do + pool.checkout + end + + wait_b.check + end + + wait_a.wait + wait_b.wait + end + + it "should create new if max was not reached" do + block_called = 0 + pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { block_called += 1; Closable.new } + block_called.should eq 1 + pool.checkout + block_called.should eq 1 + pool.checkout + block_called.should eq 2 + end + + it "should reuse returned resources" do + all = [] of Closable + pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } } + pool.checkout + b1 = pool.checkout + pool.release b1 + b2 = pool.checkout + + b1.should eq b2 + all.size.should eq 2 + end + + it "should close available and total" do + all = [] of Closable + pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } } + a = pool.checkout + b = pool.checkout + pool.release b + all.size.should eq 2 + + all[0].closed?.should be_false + all[1].closed?.should be_false + pool.close + all[0].closed?.should be_true + all[1].closed?.should be_true + end + + it "should timeout" do + pool = DB::Pool.new(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new } + pool.checkout + expect_raises DB::PoolTimeout do + pool.checkout + end + end + + it "should be able to release after a timeout" do + pool = DB::Pool.new(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new } + a = pool.checkout + pool.checkout rescue nil + pool.release a + end + + it "should close if max idle amount is reached" do + all = [] of Closable + pool = DB::Pool.new(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } } + pool.checkout + pool.checkout + pool.checkout + + all.size.should eq 3 + all.any?(&.closed?).should be_false + pool.release all[0] + + all.any?(&.closed?).should be_false + pool.release all[1] + + all[0].closed?.should be_false + all[1].closed?.should be_true + all[2].closed?.should be_false + end + + it "should create resource after max_pool was reached if idle forced some close up" do + all = [] of Closable + pool = DB::Pool.new(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } } + pool.checkout + pool.checkout + pool.checkout + pool.release all[0] + pool.release all[1] + pool.checkout + pool.checkout + + all.size.should eq 4 + end +end diff --git a/spec/statement_spec.cr b/spec/statement_spec.cr index 196c2ed..ad7b4b7 100644 --- a/spec/statement_spec.cr +++ b/spec/statement_spec.cr @@ -2,14 +2,14 @@ require "./spec_helper" describe DB::Statement do it "should prepare statements" do - with_dummy do |db| - db.prepare("the query").should be_a(DB::Statement) + with_dummy_connection do |cnn| + cnn.prepare("the query").should be_a(DB::Statement) end end it "should initialize positional params in query" do - with_dummy do |db| - stmt = db.prepare("the query").as(DummyDriver::DummyStatement) + with_dummy_connection do |cnn| + stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement) stmt.query "a", 1, nil stmt.params[0].should eq("a") stmt.params[1].should eq(1) @@ -18,8 +18,8 @@ describe DB::Statement do end it "should initialize positional params in query with array" do - with_dummy do |db| - stmt = db.prepare("the query").as(DummyDriver::DummyStatement) + with_dummy_connection do |cnn| + stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement) stmt.query ["a", 1, nil] stmt.params[0].should eq("a") stmt.params[1].should eq(1) @@ -28,8 +28,8 @@ describe DB::Statement do end it "should initialize positional params in exec" do - with_dummy do |db| - stmt = db.prepare("the query").as(DummyDriver::DummyStatement) + with_dummy_connection do |cnn| + stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement) stmt.exec "a", 1, nil stmt.params[0].should eq("a") stmt.params[1].should eq(1) @@ -38,8 +38,8 @@ describe DB::Statement do end it "should initialize positional params in exec with array" do - with_dummy do |db| - stmt = db.prepare("the query").as(DummyDriver::DummyStatement) + with_dummy_connection do |cnn| + stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement) stmt.exec ["a", 1, nil] stmt.params[0].should eq("a") stmt.params[1].should eq(1) @@ -48,8 +48,8 @@ describe DB::Statement do end it "should initialize positional params in scalar" do - with_dummy do |db| - stmt = db.prepare("the query").as(DummyDriver::DummyStatement) + with_dummy_connection do |cnn| + stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement) stmt.scalar "a", 1, nil stmt.params[0].should eq("a") stmt.params[1].should eq(1) @@ -58,8 +58,8 @@ describe DB::Statement do end it "query with block should not close statement" do - with_dummy do |db| - stmt = db.prepare "3,4 1,2" + with_dummy_connection do |cnn| + stmt = cnn.prepare "3,4 1,2" stmt.query stmt.closed?.should be_false end @@ -67,16 +67,16 @@ describe DB::Statement do it "closing connection should close statement" do stmt = uninitialized DB::Statement - with_dummy do |db| - stmt = db.prepare "3,4 1,2" + with_dummy_connection do |cnn| + stmt = cnn.prepare "3,4 1,2" stmt.query end stmt.closed?.should be_true end it "query with block should not close statement" do - with_dummy do |db| - stmt = db.prepare "3,4 1,2" + with_dummy_connection do |cnn| + stmt = cnn.prepare "3,4 1,2" stmt.query do |rs| end stmt.closed?.should be_false @@ -84,8 +84,8 @@ describe DB::Statement do end it "query should not close statement" do - with_dummy do |db| - stmt = db.prepare "3,4 1,2" + with_dummy_connection do |cnn| + stmt = cnn.prepare "3,4 1,2" stmt.query do |rs| end stmt.closed?.should be_false @@ -93,28 +93,28 @@ describe DB::Statement do end it "scalar should not close statement" do - with_dummy do |db| - stmt = db.prepare "3,4 1,2" + with_dummy_connection do |cnn| + stmt = cnn.prepare "3,4 1,2" stmt.scalar stmt.closed?.should be_false end end it "exec should not close statement" do - with_dummy do |db| - stmt = db.prepare "3,4 1,2" + with_dummy_connection do |cnn| + stmt = cnn.prepare "3,4 1,2" stmt.exec stmt.closed?.should be_false end end it "connection should cache statements by query" do - with_dummy do |db| - rs = db.query "1, ?", 2 + with_dummy_connection do |cnn| + rs = cnn.query "1, ?", 2 stmt = rs.statement rs.close - rs = db.query "1, ?", 4 + rs = cnn.query "1, ?", 4 rs.statement.should be(stmt) end end @@ -124,7 +124,8 @@ describe DB::Statement do expect_raises do db.exec "raise" end - db.@in_pool.should be_true + DummyDriver::DummyConnection.connections.size.should eq(1) + db.pool.is_available?(DummyDriver::DummyConnection.connections.first) end end end diff --git a/src/db.cr b/src/db.cr index 3541099..f474cef 100644 --- a/src/db.cr +++ b/src/db.cr @@ -6,8 +6,7 @@ require "uri" # # Drivers implementors check `Driver` class. # -# Currently a *single connection* to the database is stablished. -# In the future a connection pool and transaction support will be available. +# DB manage a connection pool. The connection pool can be configured by `URI` query. See `Database`. # # ### Usage # @@ -119,12 +118,15 @@ module DB end end +require "./db/pool" +require "./db/string_key_cache" require "./db/query_methods" require "./db/disposable" require "./db/database" require "./db/driver" require "./db/connection" require "./db/statement" +require "./db/pool_statement" require "./db/result_set" require "./db/error" require "./db/mapping" diff --git a/src/db/connection.cr b/src/db/connection.cr index 9924ec7..d6a8ee2 100644 --- a/src/db/connection.cr +++ b/src/db/connection.cr @@ -1,8 +1,4 @@ module DB - class Database; end - - abstract class Statement; end - # Database driver implementors must subclass `Connection`. # # Represents one active connection to a database. @@ -18,32 +14,31 @@ module DB # Override `#build_statement` method in order to return a prepared `Statement` to allow querying. # See also `Statement` to define how the statements are executed. # + # If at any give moment the connection is lost a DB::ConnectionLost should be raised. This will + # allow the connection pool to try to reconnect or use another connection if available. + # abstract class Connection include Disposable include QueryMethods # :nodoc: getter database - @statements_cache = {} of String => Statement + @statements_cache = StringKeyCache(Statement).new def initialize(@database : Database) end # :nodoc: def prepare(query) : Statement - stmt = @statements_cache.fetch(query, nil) - stmt = @statements_cache[query] = build_statement(query) unless stmt - - stmt + @statements_cache.fetch(query) { build_statement(query) } end abstract def build_statement(query) : Statement protected def do_close - @statements_cache.each do |_, stmt| - stmt.close - end + @statements_cache.each_value &.close @statements_cache.clear + @database.pool.delete self end end end diff --git a/src/db/database.cr b/src/db/database.cr index a7da85d..8920c0c 100644 --- a/src/db/database.cr +++ b/src/db/database.cr @@ -1,7 +1,17 @@ +require "http/params" +require "weak_ref" + module DB # Acts as an entry point for database access. - # Currently it creates a single connection to the database. - # Eventually a connection pool will be handled. + # Connections are managed by a pool. + # The connection pool can be configured from URI parameters: + # + # - initial_pool_size (default 1) + # - max_pool_size (default 1) + # - max_idle_pool_size (default 1) + # - checkout_timeout (default 5.0) + # - retry_attempts (default 1) + # - retry_delay (in seconds, default 1.0) # # It should be created from DB module. See `DB#open`. # @@ -9,56 +19,77 @@ module DB class Database # :nodoc: getter driver + # :nodoc: + getter pool # Returns the uri with the connection settings to the database getter uri - @connection : Connection? + @pool : Pool(Connection) + @setup_connection : Connection -> Nil + @statements_cache = StringKeyCache(PoolStatement).new # :nodoc: def initialize(@driver : Driver, @uri : URI) - @in_pool = true - @connection = @driver.build_connection(self) + params = HTTP::Params.parse(uri.query || "") + pool_options = @driver.connection_pool_options(params) + + @setup_connection = ->(conn : Connection) {} + @pool = uninitialized Pool(Connection) # in order to use self in the factory proc + @pool = Pool.new(**pool_options) { + conn = @driver.build_connection(self).as(Connection) + @setup_connection.call conn + conn + } + end + + def setup_connection(&proc : Connection -> Nil) + @setup_connection = proc + @pool.each_resource do |conn| + @setup_connection.call conn + end end # Closes all connection to the database. def close - @connection.try &.close - # prevent GC Warning: Finalization cycle involving discovered by mysql implementation - @connection = nil + @statements_cache.each_value &.close + @statements_cache.clear + + @pool.close end # :nodoc: def prepare(query) - conn = get_from_pool - begin - conn.prepare(query) - rescue ex - return_to_pool(conn) - raise ex - end + @statements_cache.fetch(query) { PoolStatement.new(self, query) } end # :nodoc: - def get_from_pool - raise "DB Pool Exhausted" unless @in_pool - @in_pool = false - @connection.not_nil! + def checkout_some(candidates : Enumerable(WeakRef(Connection))) : {Connection, Bool} + @pool.checkout_some candidates end # :nodoc: def return_to_pool(connection) - @in_pool = true + @pool.release connection end # yields a connection from the pool # the connection is returned to the pool after # when the block ends def using_connection - connection = get_from_pool - yield connection - ensure - return_to_pool connection + connection = @pool.checkout + begin + yield connection + ensure + return_to_pool connection + end + end + + # :nodoc: + def retry + @pool.retry do + yield + end end include QueryMethods diff --git a/src/db/driver.cr b/src/db/driver.cr index 49fc6e7..5715698 100644 --- a/src/db/driver.cr +++ b/src/db/driver.cr @@ -27,5 +27,16 @@ module DB # driver implementation instructions. abstract class Driver abstract def build_connection(db : Database) : Connection + + def connection_pool_options(params : HTTP::Params) + { + initial_pool_size: params.fetch("initial_pool_size", 1).to_i, + max_pool_size: params.fetch("max_pool_size", 1).to_i, + max_idle_pool_size: params.fetch("max_idle_pool_size", 1).to_i, + checkout_timeout: params.fetch("checkout_timeout", 5.0).to_f, + retry_attempts: params.fetch("retry_attempts", 1).to_i, + retry_delay: params.fetch("retry_delay", 1.0).to_f, + } + end end end diff --git a/src/db/error.cr b/src/db/error.cr index 0d64661..670effc 100644 --- a/src/db/error.cr +++ b/src/db/error.cr @@ -4,4 +4,17 @@ module DB class MappingException < Exception end + + class PoolTimeout < Error + end + + class PoolRetryAttemptsExceeded < Error + end + + class ConnectionLost < Error + getter connection : Connection + + def initialize(@connection) + end + end end diff --git a/src/db/pool.cr b/src/db/pool.cr new file mode 100644 index 0000000..15ca7f1 --- /dev/null +++ b/src/db/pool.cr @@ -0,0 +1,197 @@ +require "weak_ref" + +module DB + class Pool(T) + @initial_pool_size : Int32 + # maximum amount of objects in the pool. Either available or in use. + @max_pool_size : Int32 + @available = Set(T).new + @total = [] of T + @checkout_timeout : Float64 + # maximum amount of retry attempts to reconnect to the db. See `Pool#retry`. + @retry_attempts : Int32 + @retry_delay : Float64 + + def initialize(@initial_pool_size = 1, @max_pool_size = 1, @max_idle_pool_size = 1, @checkout_timeout = 5.0, + @retry_attempts = 1, @retry_delay = 0.2, &@factory : -> T) + @initial_pool_size.times { build_resource } + + @availability_channel = Channel(Nil).new + @waiting_resource = 0 + @mutex = Mutex.new + end + + # close all resources in the pool + def close : Nil + @total.each &.close + @total.clear + @available.clear + end + + def checkout : T + resource = if @available.empty? + if can_increase_pool + build_resource + else + wait_for_available + pick_available + end + else + pick_available + end + + @available.delete resource + resource + end + + # ``` + # selected, is_candidate = pool.checkout_some(candidates) + # ``` + # `selected` be a resource from the `candidates` list and `is_candidate` == `true` + # or `selected` will be a new resource adn `is_candidate` == `false` + def checkout_some(candidates : Enumerable(WeakRef(T))) : {T, Bool} + # TODO honor candidates while waiting for availables + # this will allow us to remove `candidates.includes?(resource)` + candidates.each do |ref| + resource = ref.target + if resource && is_available?(resource) + @available.delete resource + return {resource, true} + end + end + + resource = checkout + {resource, candidates.any? { |ref| ref.target == resource }} + end + + def release(resource : T) : Nil + if can_increase_idle_pool + @available << resource + @availability_channel.send nil if are_waiting_for_resource? + else + resource.close + @total.delete(resource) + end + end + + # :nodoc: + # Will retry the block if a `ConnectionLost` exception is thrown. + # It will try to reuse all of the available connection right away, + # but if a new connection is needed there is a `retry_delay` seconds delay. + def retry + current_available = @available.size + + (current_available + @retry_attempts).times do |i| + begin + sleep @retry_delay if i >= current_available + return yield + rescue e : ConnectionLost + # if the connection is lost close it to release resources + # and remove it from the known pool. + delete(e.connection) + e.connection.close + end + end + raise PoolRetryAttemptsExceeded.new + end + + # :nodoc: + def each_resource + @available.each do |resource| + yield resource + end + end + + # :nodoc: + def is_available?(resource : T) + @available.includes?(resource) + end + + # :nodoc: + def delete(resource : T) + @total.delete(resource) + @available.delete(resource) + end + + private def build_resource : T + resource = @factory.call + @total << resource + @available << resource + resource + end + + private def can_increase_pool + @total.size < @max_pool_size + end + + private def can_increase_idle_pool + @available.size < @max_idle_pool_size + end + + private def pick_available + @available.first + end + + private def wait_for_available + timeout = TimeoutHelper.new(@checkout_timeout.to_f64) + inc_waiting_resource + + timeout.start + + # TODO update to select keyword for crystal 0.19 + index, _ = Channel.select(@availability_channel.receive_select_action, timeout.receive_select_action) + case index + when 0 + timeout.cancel + dec_waiting_resource + when 1 + dec_waiting_resource + raise DB::PoolTimeout.new + else + raise DB::Error.new + end + end + + private def inc_waiting_resource + @mutex.synchronize do + @waiting_resource += 1 + end + end + + private def dec_waiting_resource + @mutex.synchronize do + @waiting_resource -= 1 + end + end + + private def are_waiting_for_resource? + @mutex.synchronize do + @waiting_resource > 0 + end + end + + class TimeoutHelper + def initialize(@timeout : Float64) + @abort_timeout = false + @timeout_channel = Channel(Nil).new + end + + def receive_select_action + @timeout_channel.receive_select_action + end + + def start + spawn do + sleep @timeout + unless @abort_timeout + @timeout_channel.send nil + end + end + end + + def cancel + @abort_timeout = true + end + end + end +end diff --git a/src/db/pool_statement.cr b/src/db/pool_statement.cr new file mode 100644 index 0000000..6e2354e --- /dev/null +++ b/src/db/pool_statement.cr @@ -0,0 +1,86 @@ +module DB + # When a statement is to be executed in a DB that has a connection pool + # a statement from the DB needs to be able to represent a statement in any + # of the connections of the pool. Otherwise the user will need to deal with + # actual connections in some point. + class PoolStatement + include StatementMethods + + # connections where the statement was prepared + @connections = Set(WeakRef(Connection)).new + + def initialize(@db : Database, @query : String) + # Prepares a statement on some connection + # otherwise the preparation is delayed until the first execution. + # After the first initialization the connection must be released + # it will be checked out when executing it. + statement_with_retry &.release_connection + # TODO use a round-robin selection in the pool so multiple sequentially + # initialized statements are assigned to different connections. + end + + protected def do_close + # TODO close all statements on all connections. + # currently statements are closed when the connection is closed. + + # WHAT-IF the connection is busy? Should each statement be able to + # deallocate itself when the connection is free. + @connections.clear + end + + # See `QueryMethods#exec` + def exec : ExecResult + statement_with_retry &.exec + end + + # See `QueryMethods#exec` + def exec(*args) : ExecResult + statement_with_retry &.exec(*args) + end + + # See `QueryMethods#exec` + def exec(args : Array) : ExecResult + statement_with_retry &.exec(args) + end + + # See `QueryMethods#query` + def query : ResultSet + statement_with_retry &.query + end + + # See `QueryMethods#query` + def query(*args) : ResultSet + statement_with_retry &.query(*args) + end + + # See `QueryMethods#query` + def query(args : Array) : ResultSet + statement_with_retry &.query(args) + end + + # builds a statement over a real connection + # the conneciton is registered in `@connections` + private def build_statement + clean_connections + conn, existing = @db.checkout_some(@connections) + @connections << WeakRef.new(conn) unless existing + conn.prepare(@query) + end + + private def clean_connections + # remove disposed or closed connections + @connections.each do |ref| + conn = ref.target + if !conn || conn.closed? + @connections.delete ref + end + end + end + + private def statement_with_retry + @db.retry do + return yield build_statement + end + end + end +end diff --git a/src/db/statement.cr b/src/db/statement.cr index 45a6997..f6573f0 100644 --- a/src/db/statement.cr +++ b/src/db/statement.cr @@ -1,4 +1,44 @@ module DB + # Common interface for connection based statements + # and for connection pool statements. + module StatementMethods + include Disposable + + protected def do_close + end + + # See `QueryMethods#scalar` + def scalar(*args) + query(*args) do |rs| + rs.each do + return rs.read + end + end + + raise "no results" + end + + # See `QueryMethods#query` + def query(*args) + rs = query(*args) + yield rs ensure rs.close + end + + # See `QueryMethods#exec` + abstract def exec : ExecResult + # See `QueryMethods#exec` + abstract def exec(*args) : ExecResult + # See `QueryMethods#exec` + abstract def exec(args : Array) : ExecResult + + # See `QueryMethods#query` + abstract def query : ResultSet + # See `QueryMethods#query` + abstract def query(*args) : ResultSet + # See `QueryMethods#query` + abstract def query(args : Array) : ResultSet + end + # Represents a prepared query in a `Connection`. # It should be created by `QueryMethods`. # @@ -10,7 +50,7 @@ module DB # 4. `#perform_exec` executes a query that is expected to return an `ExecResult` # 6. `#do_close` is called to release the statement resources. abstract class Statement - include Disposable + include StatementMethods # :nodoc: getter connection @@ -18,9 +58,6 @@ module DB def initialize(@connection : Connection) end - protected def do_close - end - def release_connection @connection.database.return_to_pool(@connection) end @@ -41,17 +78,6 @@ module DB perform_exec_and_release(args) end - # See `QueryMethods#scalar` - def scalar(*args) - query(*args) do |rs| - rs.each do - return rs.read - end - end - - raise "no results" - end - # See `QueryMethods#query` def query perform_query Tuple.new @@ -67,12 +93,6 @@ module DB perform_query args end - # See `QueryMethods#query` - def query(*args) - rs = query(*args) - yield rs ensure rs.close - end - private def perform_exec_and_release(args : Enumerable) : ExecResult return perform_exec(args) ensure diff --git a/src/db/string_key_cache.cr b/src/db/string_key_cache.cr new file mode 100644 index 0000000..f2cae62 --- /dev/null +++ b/src/db/string_key_cache.cr @@ -0,0 +1,21 @@ +module DB + class StringKeyCache(T) + @cache = {} of String => T + + def fetch(key : String) : T + value = @cache.fetch(key, nil) + value = @cache[key] = yield unless value + value + end + + def each_value + @cache.each do |_, value| + yield value + end + end + + def clear + @cache.clear + end + end +end