mirror of
https://gitea.invidious.io/iv-org/shard-crystal-db.git
synced 2024-08-15 00:53:32 +00:00
allow DB to use a connection pool.
allow Driver to parse connection pool options for extensibility. fix waiting_resource counter after a timeout was generated.
This commit is contained in:
parent
b8cabee956
commit
a2c22c16cf
7 changed files with 46 additions and 22 deletions
|
@ -61,7 +61,7 @@ describe DB do
|
|||
it "should raise if the sole connection is been used" do
|
||||
with_dummy do |db|
|
||||
db.query "1" do |rs|
|
||||
expect_raises Exception, /DB Pool Exhausted/ do
|
||||
expect_raises DB::PoolTimeout do
|
||||
db.scalar "2"
|
||||
end
|
||||
end
|
||||
|
|
|
@ -187,7 +187,7 @@ end
|
|||
def with_dummy
|
||||
DummyDriver::DummyConnection.clear_connections
|
||||
|
||||
DB.open "dummy://host" do |db|
|
||||
DB.open "dummy://host?checkout_timeout=0.5" do |db|
|
||||
yield db
|
||||
end
|
||||
end
|
||||
|
|
|
@ -131,6 +131,13 @@ describe DB::Pool do
|
|||
end
|
||||
end
|
||||
|
||||
it "should be able to release after a timeout" do
|
||||
pool = DB::Pool.new(->{ Closable.new }, max_pool_size: 1, checkout_timeout: 0.1)
|
||||
a = pool.checkout
|
||||
pool.checkout rescue nil
|
||||
pool.release a
|
||||
end
|
||||
|
||||
it "should close if max idle amount is reached" do
|
||||
all = [] of Closable
|
||||
pool = DB::Pool.new(->{ Closable.new.tap { |c| all << c } }, max_pool_size: 3, max_idle_pool_size: 1)
|
||||
|
|
|
@ -6,8 +6,7 @@ require "uri"
|
|||
#
|
||||
# Drivers implementors check `Driver` class.
|
||||
#
|
||||
# Currently a *single connection* to the database is stablished.
|
||||
# In the future a connection pool and transaction support will be available.
|
||||
# DB manage a connection pool. The connection pool can be configured by `URI` query. See `Database`.
|
||||
#
|
||||
# ### Usage
|
||||
#
|
||||
|
|
|
@ -1,7 +1,14 @@
|
|||
require "http/params"
|
||||
|
||||
module DB
|
||||
# Acts as an entry point for database access.
|
||||
# Currently it creates a single connection to the database.
|
||||
# Eventually a connection pool will be handled.
|
||||
# Connections are managed by a pool.
|
||||
# The connection pool can be configured from URI parameters:
|
||||
#
|
||||
# - initial_pool_size (default 1)
|
||||
# - max_pool_size (default 1)
|
||||
# - max_idle_pool_size (default 1)
|
||||
# - checkout_timeout (default 5.0)
|
||||
#
|
||||
# It should be created from DB module. See `DB#open`.
|
||||
#
|
||||
|
@ -13,19 +20,21 @@ module DB
|
|||
# Returns the uri with the connection settings to the database
|
||||
getter uri
|
||||
|
||||
@connection : Connection?
|
||||
@pool : Pool(Connection)
|
||||
|
||||
# :nodoc:
|
||||
def initialize(@driver : Driver, @uri : URI)
|
||||
@in_pool = true
|
||||
@connection = @driver.build_connection(self)
|
||||
# TODO: PR HTTP::Params.new -> HTTP::Params.new(Hash(String, Array(String)).new)
|
||||
params = (query = uri.query) ? HTTP::Params.parse(query) : HTTP::Params.new(Hash(String, Array(String)).new)
|
||||
pool_options = @driver.connection_pool_options(params)
|
||||
|
||||
@pool = uninitialized Pool(Connection) # in order to use self in the factory proc
|
||||
@pool = Pool.new(->{ @driver.build_connection(self).as(Connection) }, **pool_options)
|
||||
end
|
||||
|
||||
# Closes all connection to the database.
|
||||
def close
|
||||
@connection.try &.close
|
||||
# prevent GC Warning: Finalization cycle involving discovered by mysql implementation
|
||||
@connection = nil
|
||||
@pool.close
|
||||
end
|
||||
|
||||
# :nodoc:
|
||||
|
@ -41,14 +50,12 @@ module DB
|
|||
|
||||
# :nodoc:
|
||||
def get_from_pool
|
||||
raise "DB Pool Exhausted" unless @in_pool
|
||||
@in_pool = false
|
||||
@connection.not_nil!
|
||||
@pool.checkout
|
||||
end
|
||||
|
||||
# :nodoc:
|
||||
def return_to_pool(connection)
|
||||
@in_pool = true
|
||||
@pool.release connection
|
||||
end
|
||||
|
||||
# yields a connection from the pool
|
||||
|
|
|
@ -27,5 +27,14 @@ module DB
|
|||
# driver implementation instructions.
|
||||
abstract class Driver
|
||||
abstract def build_connection(db : Database) : Connection
|
||||
|
||||
def connection_pool_options(params : HTTP::Params)
|
||||
{
|
||||
initial_pool_size: params.fetch("initial_pool_size", 1).to_i,
|
||||
max_pool_size: params.fetch("max_pool_size", 1).to_i,
|
||||
max_idle_pool_size: params.fetch("max_idle_pool_size", 1).to_i,
|
||||
checkout_timeout: params.fetch("checkout_timeout", 5.0).to_f,
|
||||
}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -72,12 +72,18 @@ module DB
|
|||
timeout.start
|
||||
# if there are no available resources, sleep until one is available
|
||||
@availability_channel.receive
|
||||
timeout.raise_if_reached
|
||||
if timeout.timeout_reached?
|
||||
dec_waiting_resource
|
||||
raise DB::PoolTimeout.new
|
||||
end
|
||||
|
||||
# double check there is something available to be checkedout
|
||||
while @available.empty?
|
||||
@availability_channel.receive
|
||||
timeout.raise_if_reached
|
||||
if timeout.timeout_reached?
|
||||
dec_waiting_resource
|
||||
raise DB::PoolTimeout.new
|
||||
end
|
||||
end
|
||||
|
||||
timeout.cancel
|
||||
|
@ -125,10 +131,6 @@ module DB
|
|||
def timeout_reached?
|
||||
@should_timeout
|
||||
end
|
||||
|
||||
def raise_if_reached
|
||||
raise DB::PoolTimeout.new if timeout_reached?
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue