Merged branch feature/pool_statement into feature/pool

This commit is contained in:
Brian J. Cardiff 2016-08-30 16:42:23 -03:00
commit 598aca50af
10 changed files with 223 additions and 72 deletions

View file

@ -39,4 +39,28 @@ describe DB::Database do
end end
end end
end end
it "should allow creation of more statements than pool connections" do
DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=2" do |db|
db.prepare("query1").should be_a(DB::PoolStatement)
db.prepare("query2").should be_a(DB::PoolStatement)
db.prepare("query3").should be_a(DB::PoolStatement)
end
end
it "should return same statement in pool per query" do
with_dummy do |db|
stmt = db.prepare("query1")
db.prepare("query2").should_not eq(stmt)
db.prepare("query1").should eq(stmt)
end
end
it "should close pool statements when closing db" do
stmt = uninitialized DB::PoolStatement
with_dummy do |db|
stmt = db.prepare("query1")
end
stmt.closed?.should be_true
end
end end

View file

@ -192,3 +192,11 @@ def with_dummy
yield db yield db
end end
end end
def with_dummy_connection
with_dummy do |db|
db.using_connection do |cnn|
yield cnn.as(DummyDriver::DummyConnection)
end
end
end

View file

@ -2,14 +2,14 @@ require "./spec_helper"
describe DB::Statement do describe DB::Statement do
it "should prepare statements" do it "should prepare statements" do
with_dummy do |db| with_dummy_connection do |cnn|
db.prepare("the query").should be_a(DB::Statement) cnn.prepare("the query").should be_a(DB::Statement)
end end
end end
it "should initialize positional params in query" do it "should initialize positional params in query" do
with_dummy do |db| with_dummy_connection do |cnn|
stmt = db.prepare("the query").as(DummyDriver::DummyStatement) stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement)
stmt.query "a", 1, nil stmt.query "a", 1, nil
stmt.params[0].should eq("a") stmt.params[0].should eq("a")
stmt.params[1].should eq(1) stmt.params[1].should eq(1)
@ -18,8 +18,8 @@ describe DB::Statement do
end end
it "should initialize positional params in query with array" do it "should initialize positional params in query with array" do
with_dummy do |db| with_dummy_connection do |cnn|
stmt = db.prepare("the query").as(DummyDriver::DummyStatement) stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement)
stmt.query ["a", 1, nil] stmt.query ["a", 1, nil]
stmt.params[0].should eq("a") stmt.params[0].should eq("a")
stmt.params[1].should eq(1) stmt.params[1].should eq(1)
@ -28,8 +28,8 @@ describe DB::Statement do
end end
it "should initialize positional params in exec" do it "should initialize positional params in exec" do
with_dummy do |db| with_dummy_connection do |cnn|
stmt = db.prepare("the query").as(DummyDriver::DummyStatement) stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement)
stmt.exec "a", 1, nil stmt.exec "a", 1, nil
stmt.params[0].should eq("a") stmt.params[0].should eq("a")
stmt.params[1].should eq(1) stmt.params[1].should eq(1)
@ -38,8 +38,8 @@ describe DB::Statement do
end end
it "should initialize positional params in exec with array" do it "should initialize positional params in exec with array" do
with_dummy do |db| with_dummy_connection do |cnn|
stmt = db.prepare("the query").as(DummyDriver::DummyStatement) stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement)
stmt.exec ["a", 1, nil] stmt.exec ["a", 1, nil]
stmt.params[0].should eq("a") stmt.params[0].should eq("a")
stmt.params[1].should eq(1) stmt.params[1].should eq(1)
@ -48,8 +48,8 @@ describe DB::Statement do
end end
it "should initialize positional params in scalar" do it "should initialize positional params in scalar" do
with_dummy do |db| with_dummy_connection do |cnn|
stmt = db.prepare("the query").as(DummyDriver::DummyStatement) stmt = cnn.prepare("the query").as(DummyDriver::DummyStatement)
stmt.scalar "a", 1, nil stmt.scalar "a", 1, nil
stmt.params[0].should eq("a") stmt.params[0].should eq("a")
stmt.params[1].should eq(1) stmt.params[1].should eq(1)
@ -58,8 +58,8 @@ describe DB::Statement do
end end
it "query with block should not close statement" do it "query with block should not close statement" do
with_dummy do |db| with_dummy_connection do |cnn|
stmt = db.prepare "3,4 1,2" stmt = cnn.prepare "3,4 1,2"
stmt.query stmt.query
stmt.closed?.should be_false stmt.closed?.should be_false
end end
@ -67,16 +67,16 @@ describe DB::Statement do
it "closing connection should close statement" do it "closing connection should close statement" do
stmt = uninitialized DB::Statement stmt = uninitialized DB::Statement
with_dummy do |db| with_dummy_connection do |cnn|
stmt = db.prepare "3,4 1,2" stmt = cnn.prepare "3,4 1,2"
stmt.query stmt.query
end end
stmt.closed?.should be_true stmt.closed?.should be_true
end end
it "query with block should not close statement" do it "query with block should not close statement" do
with_dummy do |db| with_dummy_connection do |cnn|
stmt = db.prepare "3,4 1,2" stmt = cnn.prepare "3,4 1,2"
stmt.query do |rs| stmt.query do |rs|
end end
stmt.closed?.should be_false stmt.closed?.should be_false
@ -84,8 +84,8 @@ describe DB::Statement do
end end
it "query should not close statement" do it "query should not close statement" do
with_dummy do |db| with_dummy_connection do |cnn|
stmt = db.prepare "3,4 1,2" stmt = cnn.prepare "3,4 1,2"
stmt.query do |rs| stmt.query do |rs|
end end
stmt.closed?.should be_false stmt.closed?.should be_false
@ -93,28 +93,28 @@ describe DB::Statement do
end end
it "scalar should not close statement" do it "scalar should not close statement" do
with_dummy do |db| with_dummy_connection do |cnn|
stmt = db.prepare "3,4 1,2" stmt = cnn.prepare "3,4 1,2"
stmt.scalar stmt.scalar
stmt.closed?.should be_false stmt.closed?.should be_false
end end
end end
it "exec should not close statement" do it "exec should not close statement" do
with_dummy do |db| with_dummy_connection do |cnn|
stmt = db.prepare "3,4 1,2" stmt = cnn.prepare "3,4 1,2"
stmt.exec stmt.exec
stmt.closed?.should be_false stmt.closed?.should be_false
end end
end end
it "connection should cache statements by query" do it "connection should cache statements by query" do
with_dummy do |db| with_dummy_connection do |cnn|
rs = db.query "1, ?", 2 rs = cnn.query "1, ?", 2
stmt = rs.statement stmt = rs.statement
rs.close rs.close
rs = db.query "1, ?", 4 rs = cnn.query "1, ?", 4
rs.statement.should be(stmt) rs.statement.should be(stmt)
end end
end end

View file

@ -119,12 +119,14 @@ module DB
end end
require "./db/pool" require "./db/pool"
require "./db/string_key_cache"
require "./db/query_methods" require "./db/query_methods"
require "./db/disposable" require "./db/disposable"
require "./db/database" require "./db/database"
require "./db/driver" require "./db/driver"
require "./db/connection" require "./db/connection"
require "./db/statement" require "./db/statement"
require "./db/pool_statement"
require "./db/result_set" require "./db/result_set"
require "./db/error" require "./db/error"
require "./db/mapping" require "./db/mapping"

View file

@ -1,8 +1,4 @@
module DB module DB
class Database; end
abstract class Statement; end
# Database driver implementors must subclass `Connection`. # Database driver implementors must subclass `Connection`.
# #
# Represents one active connection to a database. # Represents one active connection to a database.
@ -24,25 +20,20 @@ module DB
# :nodoc: # :nodoc:
getter database getter database
@statements_cache = {} of String => Statement @statements_cache = StringKeyCache(Statement).new
def initialize(@database : Database) def initialize(@database : Database)
end end
# :nodoc: # :nodoc:
def prepare(query) : Statement def prepare(query) : Statement
stmt = @statements_cache.fetch(query, nil) @statements_cache.fetch(query) { build_statement(query) }
stmt = @statements_cache[query] = build_statement(query) unless stmt
stmt
end end
abstract def build_statement(query) : Statement abstract def build_statement(query) : Statement
protected def do_close protected def do_close
@statements_cache.each do |_, stmt| @statements_cache.each_value &.close
stmt.close
end
@statements_cache.clear @statements_cache.clear
end end
end end

View file

@ -24,11 +24,11 @@ module DB
@pool : Pool(Connection) @pool : Pool(Connection)
@setup_connection : Connection -> Nil @setup_connection : Connection -> Nil
@statements_cache = StringKeyCache(PoolStatement).new
# :nodoc: # :nodoc:
def initialize(@driver : Driver, @uri : URI) def initialize(@driver : Driver, @uri : URI)
# TODO: PR HTTP::Params.new -> HTTP::Params.new(Hash(String, Array(String)).new) params = HTTP::Params.parse(uri.query || "")
params = (query = uri.query) ? HTTP::Params.parse(query) : HTTP::Params.new(Hash(String, Array(String)).new)
pool_options = @driver.connection_pool_options(params) pool_options = @driver.connection_pool_options(params)
@setup_connection = ->(conn : Connection) {} @setup_connection = ->(conn : Connection) {}
@ -49,23 +49,20 @@ module DB
# Closes all connection to the database. # Closes all connection to the database.
def close def close
@statements_cache.each_value &.close
@statements_cache.clear
@pool.close @pool.close
end end
# :nodoc: # :nodoc:
def prepare(query) def prepare(query)
conn = get_from_pool @statements_cache.fetch(query) { PoolStatement.new(self, query) }
begin
conn.prepare(query)
rescue ex
return_to_pool(conn)
raise ex
end
end end
# :nodoc: # :nodoc:
def get_from_pool def checkout_some(candidates : Enumerable(Connection)) : {Connection, Bool}
@pool.checkout @pool.checkout_some candidates
end end
# :nodoc: # :nodoc:
@ -77,7 +74,7 @@ module DB
# the connection is returned to the pool after # the connection is returned to the pool after
# when the block ends # when the block ends
def using_connection def using_connection
connection = get_from_pool connection = @pool.checkout
begin begin
yield connection yield connection
ensure ensure

View file

@ -36,6 +36,25 @@ module DB
resource resource
end end
# ```
# selected, is_candidate = pool.checkout_some(candidates)
# ```
# `selected` be a resource from the `candidates` list and `is_candidate` == `true`
# or `selected` will be a new resource adn `is_candidate` == `false`
def checkout_some(candidates : Enumerable(T)) : {T, Bool}
# TODO honor candidates while waiting for availables
# this will allow us to remove `candidates.includes?(resource)`
candidates.each do |resource|
if is_available?(resource)
@available.delete resource
return {resource, true}
end
end
resource = checkout
{resource, candidates.includes?(resource)}
end
def release(resource : T) : Nil def release(resource : T) : Nil
if can_increase_idle_pool if can_increase_idle_pool
@available << resource @available << resource

69
src/db/pool_statement.cr Normal file
View file

@ -0,0 +1,69 @@
module DB
# When a statement is to be executed in a DB that has a connection pool
# a statement from the DB needs to be able to represent a statement in any
# of the connections of the pool. Otherwise the user will need to deal with
# actual connections in some point.
class PoolStatement
include StatementMethods
# connections where the statement was prepared
@connections = Set(Connection).new
def initialize(@db : Database, @query : String)
# Prepares a statement on some connection
# otherwise the preparation is delayed until the first execution.
# After the first initialization the connection must be released
# it will be checked out when executing it.
get_statement.release_connection
# TODO use a round-robin selection in the pool so multiple sequentially
# initialized statements are assigned to different connections.
end
protected def do_close
# TODO close all statements on all connections.
# currently statements are closed when the connection is closed.
# WHAT-IF the connection is busy? Should each statement be able to
# deallocate itself when the connection is free.
@connections.clear
end
# See `QueryMethods#exec`
def exec : ExecResult
get_statement.exec
end
# See `QueryMethods#exec`
def exec(*args) : ExecResult
get_statement.exec(*args)
end
# See `QueryMethods#exec`
def exec(args : Array) : ExecResult
get_statement.exec(args)
end
# See `QueryMethods#query`
def query : ResultSet
get_statement.query
end
# See `QueryMethods#query`
def query(*args) : ResultSet
get_statement.query(*args)
end
# See `QueryMethods#query`
def query(args : Array) : ResultSet
get_statement.query(args)
end
# builds a statement over a real connection
# the conneciton is registered in `@connections`
private def get_statement : Statement
conn, existing = @db.checkout_some(@connections)
@connections << conn unless existing
conn.prepare(@query)
end
end
end

View file

@ -1,4 +1,44 @@
module DB module DB
# Common interface for connection based statements
# and for connection pool statements.
module StatementMethods
include Disposable
protected def do_close
end
# See `QueryMethods#scalar`
def scalar(*args)
query(*args) do |rs|
rs.each do
return rs.read
end
end
raise "no results"
end
# See `QueryMethods#query`
def query(*args)
rs = query(*args)
yield rs ensure rs.close
end
# See `QueryMethods#exec`
abstract def exec : ExecResult
# See `QueryMethods#exec`
abstract def exec(*args) : ExecResult
# See `QueryMethods#exec`
abstract def exec(args : Array) : ExecResult
# See `QueryMethods#query`
abstract def query : ResultSet
# See `QueryMethods#query`
abstract def query(*args) : ResultSet
# See `QueryMethods#query`
abstract def query(args : Array) : ResultSet
end
# Represents a prepared query in a `Connection`. # Represents a prepared query in a `Connection`.
# It should be created by `QueryMethods`. # It should be created by `QueryMethods`.
# #
@ -10,7 +50,7 @@ module DB
# 4. `#perform_exec` executes a query that is expected to return an `ExecResult` # 4. `#perform_exec` executes a query that is expected to return an `ExecResult`
# 6. `#do_close` is called to release the statement resources. # 6. `#do_close` is called to release the statement resources.
abstract class Statement abstract class Statement
include Disposable include StatementMethods
# :nodoc: # :nodoc:
getter connection getter connection
@ -18,9 +58,6 @@ module DB
def initialize(@connection : Connection) def initialize(@connection : Connection)
end end
protected def do_close
end
def release_connection def release_connection
@connection.database.return_to_pool(@connection) @connection.database.return_to_pool(@connection)
end end
@ -41,17 +78,6 @@ module DB
perform_exec_and_release(args) perform_exec_and_release(args)
end end
# See `QueryMethods#scalar`
def scalar(*args)
query(*args) do |rs|
rs.each do
return rs.read
end
end
raise "no results"
end
# See `QueryMethods#query` # See `QueryMethods#query`
def query def query
perform_query Tuple.new perform_query Tuple.new
@ -67,12 +93,6 @@ module DB
perform_query args perform_query args
end end
# See `QueryMethods#query`
def query(*args)
rs = query(*args)
yield rs ensure rs.close
end
private def perform_exec_and_release(args : Enumerable) : ExecResult private def perform_exec_and_release(args : Enumerable) : ExecResult
return perform_exec(args) return perform_exec(args)
ensure ensure

View file

@ -0,0 +1,21 @@
module DB
class StringKeyCache(T)
@cache = {} of String => T
def fetch(key : String) : T
value = @cache.fetch(key, nil)
value = @cache[key] = yield unless value
value
end
def each_value
@cache.each do |_, value|
yield value
end
end
def clear
@cache.clear
end
end
end