Merge branch 'feature/pool'

Conflicts:
	src/db/error.cr
This commit is contained in:
Brian J. Cardiff 2016-09-13 01:42:24 -03:00
commit 751be7aa6a
14 changed files with 767 additions and 89 deletions

90
spec/database_spec.cr Normal file
View File

@ -0,0 +1,90 @@
require "./spec_helper"
describe DB::Database do
it "allows connection initialization" do
cnn_setup = 0
DB.open "dummy://localhost:1027?initial_pool_size=2&max_pool_size=4&max_idle_pool_size=1" do |db|
cnn_setup.should eq(0)
db.setup_connection do |cnn|
cnn_setup += 1
end
cnn_setup.should eq(2)
db.using_connection do
cnn_setup.should eq(2)
db.using_connection do
cnn_setup.should eq(2)
db.using_connection do
cnn_setup.should eq(3)
db.using_connection do
cnn_setup.should eq(4)
end
# the pool didn't shrink no new initialization should be done next
db.using_connection do
cnn_setup.should eq(4)
end
end
# the pool shrink 1. max_idle_pool_size=1
# after the previous end there where 2.
db.using_connection do
cnn_setup.should eq(4)
# so now there will be a new connection created
db.using_connection do
cnn_setup.should eq(5)
end
end
end
end
end
end
it "should allow creation of more statements than pool connections" do
DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=2" do |db|
db.prepare("query1").should be_a(DB::PoolStatement)
db.prepare("query2").should be_a(DB::PoolStatement)
db.prepare("query3").should be_a(DB::PoolStatement)
end
end
it "should return same statement in pool per query" do
with_dummy do |db|
stmt = db.prepare("query1")
db.prepare("query2").should_not eq(stmt)
db.prepare("query1").should eq(stmt)
end
end
it "should close pool statements when closing db" do
stmt = uninitialized DB::PoolStatement
with_dummy do |db|
stmt = db.prepare("query1")
end
stmt.closed?.should be_true
end
it "should not reconnect if connection is lost and retry_attempts=0" do
DummyDriver::DummyConnection.clear_connections
DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=0" do |db|
db.exec("stmt1")
DummyDriver::DummyConnection.connections.size.should eq(1)
DummyDriver::DummyConnection.connections.first.disconnect!
expect_raises DB::PoolRetryAttemptsExceeded do
db.exec("stmt1")
end
DummyDriver::DummyConnection.connections.size.should eq(1)
end
end
it "should reconnect if connection is lost and executing same statement" do
DummyDriver::DummyConnection.clear_connections
DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=1" do |db|
db.exec("stmt1")
DummyDriver::DummyConnection.connections.size.should eq(1)
DummyDriver::DummyConnection.connections.first.disconnect!
db.exec("stmt1")
DummyDriver::DummyConnection.connections.size.should eq(2)
end
end
end

View File

@ -61,7 +61,7 @@ describe DB do
it "should raise if the sole connection is been used" do
with_dummy do |db|
db.query "1" do |rs|
expect_raises Exception, /DB Pool Exhausted/ do
expect_raises DB::PoolTimeout do
db.scalar "2"
end
end

View File

@ -9,6 +9,7 @@ class DummyDriver < DB::Driver
class DummyConnection < DB::Connection
def initialize(db)
super(db)
@connected = true
@@connections ||= [] of DummyConnection
@@connections.not_nil! << self
end
@ -29,6 +30,14 @@ class DummyDriver < DB::Driver
0
end
def check
raise DB::ConnectionLost.new(self) unless @connected
end
def disconnect!
@connected = false
end
protected def do_close
super
end
@ -43,11 +52,13 @@ class DummyDriver < DB::Driver
end
protected def perform_query(args : Enumerable)
@connection.as(DummyConnection).check
set_params args
DummyResultSet.new self, @query
end
protected def perform_exec(args : Enumerable)
@connection.as(DummyConnection).check
set_params args
raise "forced exception due to query" if @query == "raise"
DB::ExecResult.new 0i64, 0_i64
@ -188,7 +199,15 @@ end
def with_dummy
DummyDriver::DummyConnection.clear_connections
DB.open "dummy://host" do |db|
DB.open "dummy://host?checkout_timeout=0.5" do |db|
yield db
end
end
def with_dummy_connection
with_dummy do |db|
db.using_connection do |cnn|
yield cnn.as(DummyDriver::DummyConnection)
end
end
end

192
spec/pool_spec.cr Normal file
View File

@ -0,0 +1,192 @@
require "./spec_helper"
class ShouldSleepingOp
@is_sleeping = false
getter is_sleeping
getter sleep_happened
def initialize
@sleep_happened = Channel(Nil).new
end
def should_sleep
s = self
@is_sleeping = true
spawn do
sleep 0.1
s.is_sleeping.should be_true
s.sleep_happened.send(nil)
end
yield
@is_sleeping = false
end
def wait_for_sleep
@sleep_happened.receive
end
end
class WaitFor
def initialize
@channel = Channel(Nil).new
end
def wait
@channel.receive
end
def check
@channel.send(nil)
end
end
class Closable
include DB::Disposable
protected def do_close
end
end
describe DB::Pool do
it "should use proc to create objects" do
block_called = 0
pool = DB::Pool.new(initial_pool_size: 3) { block_called += 1; Closable.new }
block_called.should eq(3)
end
it "should get resource" do
pool = DB::Pool.new { Closable.new }
pool.checkout.should be_a Closable
end
it "should be available if not checkedout" do
resource = uninitialized Closable
pool = DB::Pool.new(initial_pool_size: 1) { resource = Closable.new }
pool.is_available?(resource).should be_true
end
it "should not be available if checkedout" do
pool = DB::Pool.new { Closable.new }
resource = pool.checkout
pool.is_available?(resource).should be_false
end
it "should be available if returned" do
pool = DB::Pool.new { Closable.new }
resource = pool.checkout
pool.release resource
pool.is_available?(resource).should be_true
end
it "should wait for available resource" do
pool = DB::Pool.new(max_pool_size: 1, initial_pool_size: 1) { Closable.new }
b_cnn_request = ShouldSleepingOp.new
wait_a = WaitFor.new
wait_b = WaitFor.new
spawn do
a_cnn = pool.checkout
b_cnn_request.wait_for_sleep
pool.release a_cnn
wait_a.check
end
spawn do
b_cnn_request.should_sleep do
pool.checkout
end
wait_b.check
end
wait_a.wait
wait_b.wait
end
it "should create new if max was not reached" do
block_called = 0
pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { block_called += 1; Closable.new }
block_called.should eq 1
pool.checkout
block_called.should eq 1
pool.checkout
block_called.should eq 2
end
it "should reuse returned resources" do
all = [] of Closable
pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool.checkout
b1 = pool.checkout
pool.release b1
b2 = pool.checkout
b1.should eq b2
all.size.should eq 2
end
it "should close available and total" do
all = [] of Closable
pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
a = pool.checkout
b = pool.checkout
pool.release b
all.size.should eq 2
all[0].closed?.should be_false
all[1].closed?.should be_false
pool.close
all[0].closed?.should be_true
all[1].closed?.should be_true
end
it "should timeout" do
pool = DB::Pool.new(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
pool.checkout
expect_raises DB::PoolTimeout do
pool.checkout
end
end
it "should be able to release after a timeout" do
pool = DB::Pool.new(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
a = pool.checkout
pool.checkout rescue nil
pool.release a
end
it "should close if max idle amount is reached" do
all = [] of Closable
pool = DB::Pool.new(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool.checkout
pool.checkout
pool.checkout
all.size.should eq 3
all.any?(&.closed?).should be_false
pool.release all[0]
all.any?(&.closed?).should be_false
pool.release all[1]
all[0].closed?.should be_false
all[1].closed?.should be_true
all[2].closed?.should be_false
end
it "should create resource after max_pool was reached if idle forced some close up" do
all = [] of Closable
pool = DB::Pool.new(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool.checkout
pool.checkout
pool.checkout
pool.release all[0]
pool.release all[1]
pool.checkout
pool.checkout
all.size.should eq 4
end
end

View File

@ -2,14 +2,14 @@ require "./spec_helper"
describe DB::Statement do
it "should prepare statements" do
with_dummy do |db|
db.prepare("the query").should be_a(DB::Statement)
with_dummy_connection do |cnn|
cnn.prepare("the query").should be_a(DB::Statement)
end
end
it "should initialize positional params in query" do
with_dummy do |db|
stmt = db.prepare("the query").as(DummyDriver::DummyStatement)
with_dummy_connection do |cnn|
stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement)
stmt.query "a", 1, nil
stmt.params[0].should eq("a")
stmt.params[1].should eq(1)
@ -18,8 +18,8 @@ describe DB::Statement do
end
it "should initialize positional params in query with array" do
with_dummy do |db|
stmt = db.prepare("the query").as(DummyDriver::DummyStatement)
with_dummy_connection do |cnn|
stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement)
stmt.query ["a", 1, nil]
stmt.params[0].should eq("a")
stmt.params[1].should eq(1)
@ -28,8 +28,8 @@ describe DB::Statement do
end
it "should initialize positional params in exec" do
with_dummy do |db|
stmt = db.prepare("the query").as(DummyDriver::DummyStatement)
with_dummy_connection do |cnn|
stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement)
stmt.exec "a", 1, nil
stmt.params[0].should eq("a")
stmt.params[1].should eq(1)
@ -38,8 +38,8 @@ describe DB::Statement do
end
it "should initialize positional params in exec with array" do
with_dummy do |db|
stmt = db.prepare("the query").as(DummyDriver::DummyStatement)
with_dummy_connection do |cnn|
stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement)
stmt.exec ["a", 1, nil]
stmt.params[0].should eq("a")
stmt.params[1].should eq(1)
@ -48,8 +48,8 @@ describe DB::Statement do
end
it "should initialize positional params in scalar" do
with_dummy do |db|
stmt = db.prepare("the query").as(DummyDriver::DummyStatement)
with_dummy_connection do |cnn|
stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement)
stmt.scalar "a", 1, nil
stmt.params[0].should eq("a")
stmt.params[1].should eq(1)
@ -58,8 +58,8 @@ describe DB::Statement do
end
it "query with block should not close statement" do
with_dummy do |db|
stmt = db.prepare "3,4 1,2"
with_dummy_connection do |cnn|
stmt = cnn.prepare "3,4 1,2"
stmt.query
stmt.closed?.should be_false
end
@ -67,16 +67,16 @@ describe DB::Statement do
it "closing connection should close statement" do
stmt = uninitialized DB::Statement
with_dummy do |db|
stmt = db.prepare "3,4 1,2"
with_dummy_connection do |cnn|
stmt = cnn.prepare "3,4 1,2"
stmt.query
end
stmt.closed?.should be_true
end
it "query with block should not close statement" do
with_dummy do |db|
stmt = db.prepare "3,4 1,2"
with_dummy_connection do |cnn|
stmt = cnn.prepare "3,4 1,2"
stmt.query do |rs|
end
stmt.closed?.should be_false
@ -84,8 +84,8 @@ describe DB::Statement do
end
it "query should not close statement" do
with_dummy do |db|
stmt = db.prepare "3,4 1,2"
with_dummy_connection do |cnn|
stmt = cnn.prepare "3,4 1,2"
stmt.query do |rs|
end
stmt.closed?.should be_false
@ -93,28 +93,28 @@ describe DB::Statement do
end
it "scalar should not close statement" do
with_dummy do |db|
stmt = db.prepare "3,4 1,2"
with_dummy_connection do |cnn|
stmt = cnn.prepare "3,4 1,2"
stmt.scalar
stmt.closed?.should be_false
end
end
it "exec should not close statement" do
with_dummy do |db|
stmt = db.prepare "3,4 1,2"
with_dummy_connection do |cnn|
stmt = cnn.prepare "3,4 1,2"
stmt.exec
stmt.closed?.should be_false
end
end
it "connection should cache statements by query" do
with_dummy do |db|
rs = db.query "1, ?", 2
with_dummy_connection do |cnn|
rs = cnn.query "1, ?", 2
stmt = rs.statement
rs.close
rs = db.query "1, ?", 4
rs = cnn.query "1, ?", 4
rs.statement.should be(stmt)
end
end
@ -124,7 +124,8 @@ describe DB::Statement do
expect_raises do
db.exec "raise"
end
db.@in_pool.should be_true
DummyDriver::DummyConnection.connections.size.should eq(1)
db.pool.is_available?(DummyDriver::DummyConnection.connections.first)
end
end
end

View File

@ -6,8 +6,7 @@ require "uri"
#
# Drivers implementors check `Driver` class.
#
# Currently a *single connection* to the database is stablished.
# In the future a connection pool and transaction support will be available.
# DB manage a connection pool. The connection pool can be configured by `URI` query. See `Database`.
#
# ### Usage
#
@ -119,12 +118,15 @@ module DB
end
end
require "./db/pool"
require "./db/string_key_cache"
require "./db/query_methods"
require "./db/disposable"
require "./db/database"
require "./db/driver"
require "./db/connection"
require "./db/statement"
require "./db/pool_statement"
require "./db/result_set"
require "./db/error"
require "./db/mapping"

View File

@ -1,8 +1,4 @@
module DB
class Database; end
abstract class Statement; end
# Database driver implementors must subclass `Connection`.
#
# Represents one active connection to a database.
@ -18,32 +14,31 @@ module DB
# Override `#build_statement` method in order to return a prepared `Statement` to allow querying.
# See also `Statement` to define how the statements are executed.
#
# If at any give moment the connection is lost a DB::ConnectionLost should be raised. This will
# allow the connection pool to try to reconnect or use another connection if available.
#
abstract class Connection
include Disposable
include QueryMethods
# :nodoc:
getter database
@statements_cache = {} of String => Statement
@statements_cache = StringKeyCache(Statement).new
def initialize(@database : Database)
end
# :nodoc:
def prepare(query) : Statement
stmt = @statements_cache.fetch(query, nil)
stmt = @statements_cache[query] = build_statement(query) unless stmt
stmt
@statements_cache.fetch(query) { build_statement(query) }
end
abstract def build_statement(query) : Statement
protected def do_close
@statements_cache.each do |_, stmt|
stmt.close
end
@statements_cache.each_value &.close
@statements_cache.clear
@database.pool.delete self
end
end
end

View File

@ -1,7 +1,17 @@
require "http/params"
require "weak_ref"
module DB
# Acts as an entry point for database access.
# Currently it creates a single connection to the database.
# Eventually a connection pool will be handled.
# Connections are managed by a pool.
# The connection pool can be configured from URI parameters:
#
# - initial_pool_size (default 1)
# - max_pool_size (default 1)
# - max_idle_pool_size (default 1)
# - checkout_timeout (default 5.0)
# - retry_attempts (default 1)
# - retry_delay (in seconds, default 1.0)
#
# It should be created from DB module. See `DB#open`.
#
@ -9,56 +19,77 @@ module DB
class Database
# :nodoc:
getter driver
# :nodoc:
getter pool
# Returns the uri with the connection settings to the database
getter uri
@connection : Connection?
@pool : Pool(Connection)
@setup_connection : Connection -> Nil
@statements_cache = StringKeyCache(PoolStatement).new
# :nodoc:
def initialize(@driver : Driver, @uri : URI)
@in_pool = true
@connection = @driver.build_connection(self)
params = HTTP::Params.parse(uri.query || "")
pool_options = @driver.connection_pool_options(params)
@setup_connection = ->(conn : Connection) {}
@pool = uninitialized Pool(Connection) # in order to use self in the factory proc
@pool = Pool.new(**pool_options) {
conn = @driver.build_connection(self).as(Connection)
@setup_connection.call conn
conn
}
end
def setup_connection(&proc : Connection -> Nil)
@setup_connection = proc
@pool.each_resource do |conn|
@setup_connection.call conn
end
end
# Closes all connection to the database.
def close
@connection.try &.close
# prevent GC Warning: Finalization cycle involving discovered by mysql implementation
@connection = nil
@statements_cache.each_value &.close
@statements_cache.clear
@pool.close
end
# :nodoc:
def prepare(query)
conn = get_from_pool
begin
conn.prepare(query)
rescue ex
return_to_pool(conn)
raise ex
end
@statements_cache.fetch(query) { PoolStatement.new(self, query) }
end
# :nodoc:
def get_from_pool
raise "DB Pool Exhausted" unless @in_pool
@in_pool = false
@connection.not_nil!
def checkout_some(candidates : Enumerable(WeakRef(Connection))) : {Connection, Bool}
@pool.checkout_some candidates
end
# :nodoc:
def return_to_pool(connection)
@in_pool = true
@pool.release connection
end
# yields a connection from the pool
# the connection is returned to the pool after
# when the block ends
def using_connection
connection = get_from_pool
yield connection
ensure
return_to_pool connection
connection = @pool.checkout
begin
yield connection
ensure
return_to_pool connection
end
end
# :nodoc:
def retry
@pool.retry do
yield
end
end
include QueryMethods

View File

@ -27,5 +27,16 @@ module DB
# driver implementation instructions.
abstract class Driver
abstract def build_connection(db : Database) : Connection
def connection_pool_options(params : HTTP::Params)
{
initial_pool_size: params.fetch("initial_pool_size", 1).to_i,
max_pool_size: params.fetch("max_pool_size", 1).to_i,
max_idle_pool_size: params.fetch("max_idle_pool_size", 1).to_i,
checkout_timeout: params.fetch("checkout_timeout", 5.0).to_f,
retry_attempts: params.fetch("retry_attempts", 1).to_i,
retry_delay: params.fetch("retry_delay", 1.0).to_f,
}
end
end
end

View File

@ -4,4 +4,17 @@ module DB
class MappingException < Exception
end
class PoolTimeout < Error
end
class PoolRetryAttemptsExceeded < Error
end
class ConnectionLost < Error
getter connection : Connection
def initialize(@connection)
end
end
end

197
src/db/pool.cr Normal file
View File

@ -0,0 +1,197 @@
require "weak_ref"
module DB
class Pool(T)
@initial_pool_size : Int32
# maximum amount of objects in the pool. Either available or in use.
@max_pool_size : Int32
@available = Set(T).new
@total = [] of T
@checkout_timeout : Float64
# maximum amount of retry attempts to reconnect to the db. See `Pool#retry`.
@retry_attempts : Int32
@retry_delay : Float64
def initialize(@initial_pool_size = 1, @max_pool_size = 1, @max_idle_pool_size = 1, @checkout_timeout = 5.0,
@retry_attempts = 1, @retry_delay = 0.2, &@factory : -> T)
@initial_pool_size.times { build_resource }
@availability_channel = Channel(Nil).new
@waiting_resource = 0
@mutex = Mutex.new
end
# close all resources in the pool
def close : Nil
@total.each &.close
@total.clear
@available.clear
end
def checkout : T
resource = if @available.empty?
if can_increase_pool
build_resource
else
wait_for_available
pick_available
end
else
pick_available
end
@available.delete resource
resource
end
# ```
# selected, is_candidate = pool.checkout_some(candidates)
# ```
# `selected` be a resource from the `candidates` list and `is_candidate` == `true`
# or `selected` will be a new resource adn `is_candidate` == `false`
def checkout_some(candidates : Enumerable(WeakRef(T))) : {T, Bool}
# TODO honor candidates while waiting for availables
# this will allow us to remove `candidates.includes?(resource)`
candidates.each do |ref|
resource = ref.target
if resource && is_available?(resource)
@available.delete resource
return {resource, true}
end
end
resource = checkout
{resource, candidates.any? { |ref| ref.target == resource }}
end
def release(resource : T) : Nil
if can_increase_idle_pool
@available << resource
@availability_channel.send nil if are_waiting_for_resource?
else
resource.close
@total.delete(resource)
end
end
# :nodoc:
# Will retry the block if a `ConnectionLost` exception is thrown.
# It will try to reuse all of the available connection right away,
# but if a new connection is needed there is a `retry_delay` seconds delay.
def retry
current_available = @available.size
(current_available + @retry_attempts).times do |i|
begin
sleep @retry_delay if i >= current_available
return yield
rescue e : ConnectionLost
# if the connection is lost close it to release resources
# and remove it from the known pool.
delete(e.connection)
e.connection.close
end
end
raise PoolRetryAttemptsExceeded.new
end
# :nodoc:
def each_resource
@available.each do |resource|
yield resource
end
end
# :nodoc:
def is_available?(resource : T)
@available.includes?(resource)
end
# :nodoc:
def delete(resource : T)
@total.delete(resource)
@available.delete(resource)
end
private def build_resource : T
resource = @factory.call
@total << resource
@available << resource
resource
end
private def can_increase_pool
@total.size < @max_pool_size
end
private def can_increase_idle_pool
@available.size < @max_idle_pool_size
end
private def pick_available
@available.first
end
private def wait_for_available
timeout = TimeoutHelper.new(@checkout_timeout.to_f64)
inc_waiting_resource
timeout.start
# TODO update to select keyword for crystal 0.19
index, _ = Channel.select(@availability_channel.receive_select_action, timeout.receive_select_action)
case index
when 0
timeout.cancel
dec_waiting_resource
when 1
dec_waiting_resource
raise DB::PoolTimeout.new
else
raise DB::Error.new
end
end
private def inc_waiting_resource
@mutex.synchronize do
@waiting_resource += 1
end
end
private def dec_waiting_resource
@mutex.synchronize do
@waiting_resource -= 1
end
end
private def are_waiting_for_resource?
@mutex.synchronize do
@waiting_resource > 0
end
end
class TimeoutHelper
def initialize(@timeout : Float64)
@abort_timeout = false
@timeout_channel = Channel(Nil).new
end
def receive_select_action
@timeout_channel.receive_select_action
end
def start
spawn do
sleep @timeout
unless @abort_timeout
@timeout_channel.send nil
end
end
end
def cancel
@abort_timeout = true
end
end
end
end

86
src/db/pool_statement.cr Normal file
View File

@ -0,0 +1,86 @@
module DB
# When a statement is to be executed in a DB that has a connection pool
# a statement from the DB needs to be able to represent a statement in any
# of the connections of the pool. Otherwise the user will need to deal with
# actual connections in some point.
class PoolStatement
include StatementMethods
# connections where the statement was prepared
@connections = Set(WeakRef(Connection)).new
def initialize(@db : Database, @query : String)
# Prepares a statement on some connection
# otherwise the preparation is delayed until the first execution.
# After the first initialization the connection must be released
# it will be checked out when executing it.
statement_with_retry &.release_connection
# TODO use a round-robin selection in the pool so multiple sequentially
# initialized statements are assigned to different connections.
end
protected def do_close
# TODO close all statements on all connections.
# currently statements are closed when the connection is closed.
# WHAT-IF the connection is busy? Should each statement be able to
# deallocate itself when the connection is free.
@connections.clear
end
# See `QueryMethods#exec`
def exec : ExecResult
statement_with_retry &.exec
end
# See `QueryMethods#exec`
def exec(*args) : ExecResult
statement_with_retry &.exec(*args)
end
# See `QueryMethods#exec`
def exec(args : Array) : ExecResult
statement_with_retry &.exec(args)
end
# See `QueryMethods#query`
def query : ResultSet
statement_with_retry &.query
end
# See `QueryMethods#query`
def query(*args) : ResultSet
statement_with_retry &.query(*args)
end
# See `QueryMethods#query`
def query(args : Array) : ResultSet
statement_with_retry &.query(args)
end
# builds a statement over a real connection
# the conneciton is registered in `@connections`
private def build_statement
clean_connections
conn, existing = @db.checkout_some(@connections)
@connections << WeakRef.new(conn) unless existing
conn.prepare(@query)
end
private def clean_connections
# remove disposed or closed connections
@connections.each do |ref|
conn = ref.target
if !conn || conn.closed?
@connections.delete ref
end
end
end
private def statement_with_retry
@db.retry do
return yield build_statement
end
end
end
end

View File

@ -1,4 +1,44 @@
module DB
# Common interface for connection based statements
# and for connection pool statements.
module StatementMethods
include Disposable
protected def do_close
end
# See `QueryMethods#scalar`
def scalar(*args)
query(*args) do |rs|
rs.each do
return rs.read
end
end
raise "no results"
end
# See `QueryMethods#query`
def query(*args)
rs = query(*args)
yield rs ensure rs.close
end
# See `QueryMethods#exec`
abstract def exec : ExecResult
# See `QueryMethods#exec`
abstract def exec(*args) : ExecResult
# See `QueryMethods#exec`
abstract def exec(args : Array) : ExecResult
# See `QueryMethods#query`
abstract def query : ResultSet
# See `QueryMethods#query`
abstract def query(*args) : ResultSet
# See `QueryMethods#query`
abstract def query(args : Array) : ResultSet
end
# Represents a prepared query in a `Connection`.
# It should be created by `QueryMethods`.
#
@ -10,7 +50,7 @@ module DB
# 4. `#perform_exec` executes a query that is expected to return an `ExecResult`
# 6. `#do_close` is called to release the statement resources.
abstract class Statement
include Disposable
include StatementMethods
# :nodoc:
getter connection
@ -18,9 +58,6 @@ module DB
def initialize(@connection : Connection)
end
protected def do_close
end
def release_connection
@connection.database.return_to_pool(@connection)
end
@ -41,17 +78,6 @@ module DB
perform_exec_and_release(args)
end
# See `QueryMethods#scalar`
def scalar(*args)
query(*args) do |rs|
rs.each do
return rs.read
end
end
raise "no results"
end
# See `QueryMethods#query`
def query
perform_query Tuple.new
@ -67,12 +93,6 @@ module DB
perform_query args
end
# See `QueryMethods#query`
def query(*args)
rs = query(*args)
yield rs ensure rs.close
end
private def perform_exec_and_release(args : Enumerable) : ExecResult
return perform_exec(args)
ensure

View File

@ -0,0 +1,21 @@
module DB
class StringKeyCache(T)
@cache = {} of String => T
def fetch(key : String) : T
value = @cache.fetch(key, nil)
value = @cache[key] = yield unless value
value
end
def each_value
@cache.each do |_, value|
yield value
end
end
def clear
@cache.clear
end
end
end