Refactor connection factory (#181)

* Start moving out URI from ConnectionContext

Create connections with an initial context. Database will set itself as context after connection has been created

* Migrate to simpler/decoupled factory in driver

This allows more freedom on how the connection is created. It will no longer need to have an explicit reference to the connection URI

* Introduce DB::Connection::Options

Move prepared_statements out from ConnectionContext

* Delegate options parsing to driver

DRY parsing connection options for database

* Introduce DB::Pool::Options

* Rename Driver#connection_pool_options to pool_options

* Drop driver getter from database

* Drop uri getter from database

* Add public Database#initialize method

* Drop :nodoc: Database#initialize

* Pass spec helper explicitly (to access methods within each spec)

* Update docs

* Update src/db/pool.cr

Co-authored-by: Beta Ziliani <beta@manas.tech>

* Use ConnectionBuilder instead of procs

* Fix inferred type when there is a single concrete connection type

* Update src/db/driver.cr

Co-authored-by: Beta Ziliani <beta@manas.tech>

---------

Co-authored-by: Beta Ziliani <beta@manas.tech>
This commit is contained in:
Brian J. Cardiff 2023-06-22 22:03:08 -03:00 committed by GitHub
parent 65b926c926
commit f13846b133
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 178 additions and 86 deletions

View file

@ -36,6 +36,15 @@ class FooValue
end
class FooDriver < DB::Driver
class FooConnectionBuilder < DB::ConnectionBuilder
def initialize(@options : DB::Connection::Options)
end
def build : DB::Connection
FooConnection.new(@options)
end
end
alias Any = DB::Any | FooValue
@@row = [] of Any
@ -47,8 +56,9 @@ class FooDriver < DB::Driver
@@row
end
def build_connection(context : DB::ConnectionContext) : DB::Connection
FooConnection.new(context)
def connection_builder(uri : URI) : DB::ConnectionBuilder
params = HTTP::Params.parse(uri.query || "")
FooConnectionBuilder.new(connection_options(params))
end
class FooConnection < DB::Connection
@ -99,6 +109,15 @@ class BarValue
end
class BarDriver < DB::Driver
class BarConnectionBuilder < DB::ConnectionBuilder
def initialize(@options : DB::Connection::Options)
end
def build : DB::Connection
BarConnection.new(@options)
end
end
alias Any = DB::Any | BarValue
@@row = [] of Any
@ -110,8 +129,9 @@ class BarDriver < DB::Driver
@@row
end
def build_connection(context : DB::ConnectionContext) : DB::Connection
BarConnection.new(context)
def connection_builder(uri : URI) : DB::ConnectionBuilder
params = HTTP::Params.parse(uri.query || "")
BarConnectionBuilder.new(connection_options(params))
end
class BarConnection < DB::Connection
@ -156,8 +176,8 @@ DB.register_driver "bar", BarDriver
describe DB do
it "should be able to register multiple drivers" do
DB.open("foo://host").driver.should be_a(FooDriver)
DB.open("bar://host").driver.should be_a(BarDriver)
DB.open("foo://host").checkout.should be_a(FooDriver::FooConnection)
DB.open("bar://host").checkout.should be_a(BarDriver::BarConnection)
end
it "Foo and Bar drivers should return fake_row" do

View file

@ -9,12 +9,9 @@ describe DB do
DB.driver_class("dummy").should eq(DummyDriver)
end
it "should instantiate driver with connection uri" do
it "should create dummy connection" do
db = DB.open "dummy://localhost:1027"
db.driver.should be_a(DummyDriver)
db.uri.scheme.should eq("dummy")
db.uri.host.should eq("localhost")
db.uri.port.should eq(1027)
db.checkout.should be_a(DummyDriver::DummyConnection)
end
it "should create a connection and close it" do

View file

@ -2,13 +2,23 @@ require "spec"
require "../src/db"
class DummyDriver < DB::Driver
def build_connection(context : DB::ConnectionContext) : DB::Connection
DummyConnection.new(context)
class DummyConnectionBuilder < DB::ConnectionBuilder
def initialize(@options : DB::Connection::Options)
end
def build : DB::Connection
DummyConnection.new(@options)
end
end
def connection_builder(uri : URI) : DB::ConnectionBuilder
params = HTTP::Params.parse(uri.query || "")
DummyConnectionBuilder.new(connection_options(params))
end
class DummyConnection < DB::Connection
def initialize(context)
super(context)
def initialize(options : DB::Connection::Options)
super(options)
@connected = true
@@connections ||= [] of DummyConnection
@@connections.not_nil! << self

View file

@ -22,10 +22,10 @@ describe DB::Pool do
expected_per_connection = 5
requests = fixed_pool_size * expected_per_connection
pool = DB::Pool.new(
pool = DB::Pool.new(DB::Pool::Options.new(
initial_pool_size: fixed_pool_size,
max_pool_size: fixed_pool_size,
max_idle_pool_size: fixed_pool_size) {
max_idle_pool_size: fixed_pool_size)) {
HTTP::Client.new(URI.parse("http://127.0.0.1:#{address.port}/"))
}

View file

@ -57,15 +57,19 @@ class Closable
end
end
private def create_pool(**options, &factory : -> T) forall T
DB::Pool.new(DB::Pool::Options.new(**options), &factory)
end
describe DB::Pool do
it "should use proc to create objects" do
block_called = 0
pool = DB::Pool.new(initial_pool_size: 3) { block_called += 1; Closable.new }
pool = create_pool(initial_pool_size: 3) { block_called += 1; Closable.new }
block_called.should eq(3)
end
it "should get resource" do
pool = DB::Pool.new { Closable.new }
pool = create_pool { Closable.new }
resource = pool.checkout
resource.should be_a Closable
resource.before_checkout_called.should be_true
@ -73,18 +77,18 @@ describe DB::Pool do
it "should be available if not checkedout" do
resource = uninitialized Closable
pool = DB::Pool.new(initial_pool_size: 1) { resource = Closable.new }
pool = create_pool(initial_pool_size: 1) { resource = Closable.new }
pool.is_available?(resource).should be_true
end
it "should not be available if checkedout" do
pool = DB::Pool.new { Closable.new }
pool = create_pool { Closable.new }
resource = pool.checkout
pool.is_available?(resource).should be_false
end
it "should be available if returned" do
pool = DB::Pool.new { Closable.new }
pool = create_pool { Closable.new }
resource = pool.checkout
resource.after_release_called.should be_false
pool.release resource
@ -93,7 +97,7 @@ describe DB::Pool do
end
it "should wait for available resource" do
pool = DB::Pool.new(max_pool_size: 1, initial_pool_size: 1) { Closable.new }
pool = create_pool(max_pool_size: 1, initial_pool_size: 1) { Closable.new }
b_cnn_request = ShouldSleepingOp.new
wait_a = WaitFor.new
@ -121,7 +125,7 @@ describe DB::Pool do
it "should create new if max was not reached" do
block_called = 0
pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { block_called += 1; Closable.new }
pool = create_pool(max_pool_size: 2, initial_pool_size: 1) { block_called += 1; Closable.new }
block_called.should eq 1
pool.checkout
block_called.should eq 1
@ -131,7 +135,7 @@ describe DB::Pool do
it "should reuse returned resources" do
all = [] of Closable
pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool = create_pool(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool.checkout
b1 = pool.checkout
pool.release b1
@ -143,7 +147,7 @@ describe DB::Pool do
it "should close available and total" do
all = [] of Closable
pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool = create_pool(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
a = pool.checkout
b = pool.checkout
pool.release b
@ -157,7 +161,7 @@ describe DB::Pool do
end
it "should timeout" do
pool = DB::Pool.new(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
pool = create_pool(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
pool.checkout
expect_raises DB::PoolTimeout do
pool.checkout
@ -165,7 +169,7 @@ describe DB::Pool do
end
it "should be able to release after a timeout" do
pool = DB::Pool.new(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
pool = create_pool(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
a = pool.checkout
pool.checkout rescue nil
pool.release a
@ -173,7 +177,7 @@ describe DB::Pool do
it "should close if max idle amount is reached" do
all = [] of Closable
pool = DB::Pool.new(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool = create_pool(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool.checkout
pool.checkout
pool.checkout
@ -191,7 +195,7 @@ describe DB::Pool do
end
it "should not return closed resources to the pool" do
pool = DB::Pool.new(max_pool_size: 1, max_idle_pool_size: 1) { Closable.new }
pool = create_pool(max_pool_size: 1, max_idle_pool_size: 1) { Closable.new }
# pool size 1 should be reusing the one resource
resource1 = pool.checkout
@ -209,7 +213,7 @@ describe DB::Pool do
it "should create resource after max_pool was reached if idle forced some close up" do
all = [] of Closable
pool = DB::Pool.new(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool = create_pool(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool.checkout
pool.checkout
pool.checkout

View file

@ -152,7 +152,13 @@ module DB
end
private def self.build_database(uri : URI)
Database.new(build_driver(uri), uri)
driver = build_driver(uri)
params = HTTP::Params.parse(uri.query || "")
connection_options = driver.connection_options(params)
pool_options = driver.pool_options(params)
builder = driver.connection_builder(uri)
factory = ->{ builder.build }
Database.new(connection_options, pool_options, &factory)
end
private def self.build_connection(connection_string : String)
@ -160,7 +166,7 @@ module DB
end
private def self.build_connection(uri : URI)
build_driver(uri).build_connection(SingleConnectionContext.new(uri)).as(Connection)
build_driver(uri).connection_builder(uri).build
end
private def self.build_driver(uri : URI)
@ -188,6 +194,7 @@ require "./db/enumerable_concat"
require "./db/query_methods"
require "./db/session_methods"
require "./db/disposable"
require "./db/connection_builder"
require "./db/driver"
require "./db/statement"
require "./db/begin_transaction"

View file

@ -23,16 +23,28 @@ module DB
include SessionMethods(Connection, Statement)
include BeginTransaction
record Options,
# Return whether the statements should be prepared by default
prepared_statements : Bool = true do
def self.from_http_params(params : HTTP::Params, default = Options.new)
Options.new(
prepared_statements: DB.fetch_bool(params, "prepared_statements", default.prepared_statements)
)
end
end
# :nodoc:
getter context
property context : ConnectionContext = SingleConnectionContext.default
@statements_cache = StringKeyCache(Statement).new
@transaction = false
getter? prepared_statements : Bool
# :nodoc:
property auto_release : Bool = true
def initialize(@context : ConnectionContext)
@prepared_statements = @context.prepared_statements?
def initialize(@options : Options)
end
def prepared_statements? : Bool
@options.prepared_statements
end
# :nodoc:
@ -59,7 +71,7 @@ module DB
protected def do_close
@statements_cache.each_value &.close
@statements_cache.clear
@context.discard self
context.discard self
end
# :nodoc:
@ -75,7 +87,7 @@ module DB
# managed by the database. Should be used
# only if the connection was obtained by `Database#checkout`.
def release
@context.release(self)
context.release(self)
end
# :nodoc:

View file

@ -0,0 +1,8 @@
module DB
# A connection factory with a specific configuration.
#
# See `Driver#connection_builder`.
abstract class ConnectionBuilder
abstract def build : Connection
end
end

View file

@ -1,11 +1,5 @@
module DB
module ConnectionContext
# Returns the uri with the connection settings to the database
abstract def uri : URI
# Return whether the statements should be prepared by default
abstract def prepared_statements? : Bool
# Indicates that the *connection* was permanently closed
# and should not be used in the future.
abstract def discard(connection : Connection)
@ -19,13 +13,7 @@ module DB
class SingleConnectionContext
include ConnectionContext
getter uri : URI
getter? prepared_statements : Bool
def initialize(@uri : URI)
params = HTTP::Params.parse(uri.query || "")
@prepared_statements = DB.fetch_bool(params, "prepared_statements", true)
end
class_getter default : SingleConnectionContext = SingleConnectionContext.new
def discard(connection : Connection)
end

View file

@ -10,8 +10,9 @@ module DB
#
# ## Database URI
#
# Connection parameters are configured in a URI. The format is specified by the individual
# database drivers. See the [reference book](https://crystal-lang.org/reference/database/) for examples.
# Connection parameters are usually in a URI. The format is specified by the individual
# database drivers, yet there are some common properties names usually shared.
# See the [reference book](https://crystal-lang.org/reference/database/) for examples.
#
# The connection pool can be configured from URI parameters:
#
@ -31,36 +32,33 @@ module DB
include SessionMethods(Database, PoolStatement)
include ConnectionContext
# :nodoc:
getter driver
# :nodoc:
getter pool
# Returns the uri with the connection settings to the database
getter uri : URI
getter? prepared_statements : Bool
@connection_options : Connection::Options
@pool : Pool(Connection)
@setup_connection : Connection -> Nil
@statements_cache = StringKeyCache(PoolPreparedStatement).new
# :nodoc:
def initialize(@driver : Driver, @uri : URI)
params = HTTP::Params.parse(uri.query || "")
@prepared_statements = DB.fetch_bool(params, "prepared_statements", true)
pool_options = @driver.connection_pool_options(params)
# Initialize a database with the specified options and connection factory.
# This covers more advanced use cases that might not be supported by an URI connection string such as tunneling connection.
def initialize(connection_options : Connection::Options, pool_options : Pool::Options, &factory : -> Connection)
@connection_options = connection_options
@setup_connection = ->(conn : Connection) {}
@pool = uninitialized Pool(Connection) # in order to use self in the factory proc
@pool = Pool.new(**pool_options) {
conn = @driver.build_connection(self).as(Connection)
@pool = Pool(Connection).new(pool_options) {
conn = factory.call
conn.auto_release = false
conn.context = self
@setup_connection.call conn
conn
}
end
def prepared_statements? : Bool
@connection_options.prepared_statements
end
# Run the specified block every time a new connection is established, yielding the new connection
# to the block.
#

View file

@ -1,21 +1,23 @@
module DB
# Database driver implementors must subclass `Driver`,
# register with a driver_name using `DB#register_driver` and
# override the factory method `#build_connection`.
# override the factory method `#connection_builder`.
#
# ```
# require "db"
#
# class FakeDriver < DB::Driver
# def build_connection(context : DB::ConnectionContext)
# FakeConnection.new context
# def connection_builder(uri : URI) : Proc(DB::Connection)
# params = HTTP::Params.parse(uri.query || "")
# options = connection_options(params)
# ->{ FakeConnection.new(options).as(DB::Connection) }
# end
# end
#
# DB.register_driver "fake", FakeDriver
# ```
#
# Access to this fake datbase will be available with
# Access to this fake database will be available with
#
# ```
# DB.open "fake://..." do |db|
@ -25,18 +27,22 @@ module DB
#
# Refer to `Connection`, `Statement` and `ResultSet` for further
# driver implementation instructions.
#
# Override `#connection_options` and `#pool_options` to provide custom
# defaults or parsing of the connection string URI.
abstract class Driver
abstract def build_connection(context : ConnectionContext) : Connection
# Returns a new connection factory.
#
# NOTE: For implementors *uri* should be parsed once. If all the options
# are sound a ConnectionBuilder is returned.
abstract def connection_builder(uri : URI) : ConnectionBuilder
def connection_pool_options(params : HTTP::Params)
{
initial_pool_size: params.fetch("initial_pool_size", 1).to_i,
max_pool_size: params.fetch("max_pool_size", 0).to_i,
max_idle_pool_size: params.fetch("max_idle_pool_size", 1).to_i,
checkout_timeout: params.fetch("checkout_timeout", 5.0).to_f,
retry_attempts: params.fetch("retry_attempts", 1).to_i,
retry_delay: params.fetch("retry_delay", 1.0).to_f,
}
def connection_options(params : HTTP::Params) : Connection::Options
Connection::Options.from_http_params(params)
end
def pool_options(params : HTTP::Params) : Pool::Options
Pool::Options.from_http_params(params)
end
end
end

View file

@ -4,6 +4,31 @@ require "./error"
module DB
class Pool(T)
record Options,
# initial number of connections in the pool
initial_pool_size : Int32 = 1,
# maximum amount of connections in the pool (Idle + InUse). 0 means no maximum.
max_pool_size : Int32 = 0,
# maximum amount of idle connections in the pool
max_idle_pool_size : Int32 = 1,
# seconds to wait before timeout while doing a checkout
checkout_timeout : Float64 = 5.0,
# maximum amount of retry attempts to reconnect to the db. See `Pool#retry`
retry_attempts : Int32 = 1,
# seconds to wait before a retry attempt
retry_delay : Float64 = 0.2 do
def self.from_http_params(params : HTTP::Params, default = Options.new)
Options.new(
initial_pool_size: params.fetch("initial_pool_size", default.initial_pool_size).to_i,
max_pool_size: params.fetch("max_pool_size", default.max_pool_size).to_i,
max_idle_pool_size: params.fetch("max_idle_pool_size", default.max_idle_pool_size).to_i,
checkout_timeout: params.fetch("checkout_timeout", default.checkout_timeout).to_f,
retry_attempts: params.fetch("retry_attempts", default.retry_attempts).to_i,
retry_delay: params.fetch("retry_delay", default.retry_delay).to_f,
)
end
end
# Pool configuration
# initial number of connections in the pool
@ -37,8 +62,25 @@ module DB
# global pool mutex
@mutex : Mutex
def initialize(@initial_pool_size = 1, @max_pool_size = 0, @max_idle_pool_size = 1, @checkout_timeout = 5.0,
@retry_attempts = 1, @retry_delay = 0.2, &@factory : -> T)
@[Deprecated("Use `#new` with DB::Pool::Options instead")]
def initialize(initial_pool_size = 1, max_pool_size = 0, max_idle_pool_size = 1, checkout_timeout = 5.0,
retry_attempts = 1, retry_delay = 0.2, &factory : -> T)
initialize(
Options.new(
initial_pool_size: initial_pool_size, max_pool_size: max_pool_size,
max_idle_pool_size: max_idle_pool_size, checkout_timeout: checkout_timeout,
retry_attempts: retry_attempts, retry_delay: retry_delay),
&factory)
end
def initialize(pool_options : Options = Options.new, &@factory : -> T)
@initial_pool_size = pool_options.initial_pool_size
@max_pool_size = pool_options.max_pool_size
@max_idle_pool_size = pool_options.max_idle_pool_size
@checkout_timeout = pool_options.checkout_timeout
@retry_attempts = pool_options.retry_attempts
@retry_delay = pool_options.retry_delay
@availability_channel = Channel(Nil).new
@waiting_resource = 0
@inflight = 0

View file

@ -523,7 +523,7 @@ module DB
def self.run(description = "as a db")
ctx = self.new
with ctx yield
with ctx yield ctx
describe description do
ctx.include_shared_specs