mirror of
https://gitea.invidious.io/iv-org/shard-crystal-db.git
synced 2024-08-15 00:53:32 +00:00
Add connection retry logic to connection pool
This commit is contained in:
parent
dbf7c94ef4
commit
47e7d826e8
8 changed files with 102 additions and 9 deletions
|
@ -63,4 +63,28 @@ describe DB::Database do
|
|||
end
|
||||
stmt.closed?.should be_true
|
||||
end
|
||||
|
||||
it "should not reconnect if connection is lost and retry_attempts=0" do
|
||||
DummyDriver::DummyConnection.clear_connections
|
||||
DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=0" do |db|
|
||||
db.exec("stmt1")
|
||||
DummyDriver::DummyConnection.connections.size.should eq(1)
|
||||
DummyDriver::DummyConnection.connections.first.disconnect!
|
||||
expect_raises DB::PoolRetryAttemptsExceeded do
|
||||
db.exec("stmt1")
|
||||
end
|
||||
DummyDriver::DummyConnection.connections.size.should eq(1)
|
||||
end
|
||||
end
|
||||
|
||||
it "should reconnect if connection is lost and executing same statement" do
|
||||
DummyDriver::DummyConnection.clear_connections
|
||||
DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=1" do |db|
|
||||
db.exec("stmt1")
|
||||
DummyDriver::DummyConnection.connections.size.should eq(1)
|
||||
DummyDriver::DummyConnection.connections.first.disconnect!
|
||||
db.exec("stmt1")
|
||||
DummyDriver::DummyConnection.connections.size.should eq(2)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -9,6 +9,7 @@ class DummyDriver < DB::Driver
|
|||
class DummyConnection < DB::Connection
|
||||
def initialize(db)
|
||||
super(db)
|
||||
@connected = true
|
||||
@@connections ||= [] of DummyConnection
|
||||
@@connections.not_nil! << self
|
||||
end
|
||||
|
@ -29,6 +30,14 @@ class DummyDriver < DB::Driver
|
|||
0
|
||||
end
|
||||
|
||||
def check
|
||||
raise DB::ConnectionLost.new(self) unless @connected
|
||||
end
|
||||
|
||||
def disconnect!
|
||||
@connected = false
|
||||
end
|
||||
|
||||
protected def do_close
|
||||
super
|
||||
end
|
||||
|
@ -43,11 +52,13 @@ class DummyDriver < DB::Driver
|
|||
end
|
||||
|
||||
protected def perform_query(args : Enumerable)
|
||||
@connection.as(DummyConnection).check
|
||||
set_params args
|
||||
DummyResultSet.new self, @query
|
||||
end
|
||||
|
||||
protected def perform_exec(args : Enumerable)
|
||||
@connection.as(DummyConnection).check
|
||||
set_params args
|
||||
raise "forced exception due to query" if @query == "raise"
|
||||
DB::ExecResult.new 0i64, 0_i64
|
||||
|
|
|
@ -14,6 +14,9 @@ module DB
|
|||
# Override `#build_statement` method in order to return a prepared `Statement` to allow querying.
|
||||
# See also `Statement` to define how the statements are executed.
|
||||
#
|
||||
# If at any give moment the connection is lost a DB::ConnectionLost should be raised. This will
|
||||
# allow the connection pool to try to reconnect or use another connection if available.
|
||||
#
|
||||
abstract class Connection
|
||||
include Disposable
|
||||
include QueryMethods
|
||||
|
|
|
@ -9,6 +9,8 @@ module DB
|
|||
# - max_pool_size (default 1)
|
||||
# - max_idle_pool_size (default 1)
|
||||
# - checkout_timeout (default 5.0)
|
||||
# - retry_attempts (default 1)
|
||||
# - retry_delay (in seconds, default 1.0)
|
||||
#
|
||||
# It should be created from DB module. See `DB#open`.
|
||||
#
|
||||
|
@ -82,6 +84,13 @@ module DB
|
|||
end
|
||||
end
|
||||
|
||||
# :nodoc:
|
||||
def retry
|
||||
@pool.retry do
|
||||
yield
|
||||
end
|
||||
end
|
||||
|
||||
include QueryMethods
|
||||
end
|
||||
end
|
||||
|
|
|
@ -34,6 +34,8 @@ module DB
|
|||
max_pool_size: params.fetch("max_pool_size", 1).to_i,
|
||||
max_idle_pool_size: params.fetch("max_idle_pool_size", 1).to_i,
|
||||
checkout_timeout: params.fetch("checkout_timeout", 5.0).to_f,
|
||||
retry_attempts: params.fetch("retry_attempts", 1).to_i,
|
||||
retry_delay: params.fetch("retry_delay", 1.0).to_f,
|
||||
}
|
||||
end
|
||||
end
|
||||
|
|
|
@ -7,4 +7,14 @@ module DB
|
|||
|
||||
class PoolTimeout < Error
|
||||
end
|
||||
|
||||
class PoolRetryAttemptsExceeded < Error
|
||||
end
|
||||
|
||||
class ConnectionLost < Error
|
||||
getter connection : Connection
|
||||
|
||||
def initialize(@connection)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -6,8 +6,12 @@ module DB
|
|||
@available = Set(T).new
|
||||
@total = [] of T
|
||||
@checkout_timeout : Float64
|
||||
# maximum amount of retry attempts to reconnect to the db. See `Pool#retry`.
|
||||
@retry_attempts : Int32
|
||||
@retry_delay : Float64
|
||||
|
||||
def initialize(@initial_pool_size = 1, @max_pool_size = 1, @max_idle_pool_size = 1, @checkout_timeout = 5.0, &@factory : -> T)
|
||||
def initialize(@initial_pool_size = 1, @max_pool_size = 1, @max_idle_pool_size = 1, @checkout_timeout = 5.0,
|
||||
@retry_attempts = 1, @retry_delay = 0.2, &@factory : -> T)
|
||||
@initial_pool_size.times { build_resource }
|
||||
|
||||
@availability_channel = Channel(Nil).new
|
||||
|
@ -67,6 +71,28 @@ module DB
|
|||
end
|
||||
end
|
||||
|
||||
# :nodoc:
|
||||
# Will retry the block if a `ConnectionLost` exception is thrown.
|
||||
# It will try to reuse all of the available connection right away,
|
||||
# but if a new connection is needed there is a `retry_delay` seconds delay.
|
||||
def retry
|
||||
current_available = @available.size
|
||||
|
||||
(current_available + @retry_attempts).times do |i|
|
||||
begin
|
||||
sleep @retry_delay if i >= current_available
|
||||
return yield
|
||||
rescue e : ConnectionLost
|
||||
# if the connection is lost close it to release resources
|
||||
# and remove it from the known pool.
|
||||
@total.delete(e.connection)
|
||||
@available.delete(e.connection)
|
||||
e.connection.close
|
||||
end
|
||||
end
|
||||
raise PoolRetryAttemptsExceeded.new
|
||||
end
|
||||
|
||||
# :nodoc:
|
||||
def each_resource
|
||||
@available.each do |resource|
|
||||
|
|
|
@ -14,7 +14,7 @@ module DB
|
|||
# otherwise the preparation is delayed until the first execution.
|
||||
# After the first initialization the connection must be released
|
||||
# it will be checked out when executing it.
|
||||
get_statement.release_connection
|
||||
statement_with_retry &.release_connection
|
||||
# TODO use a round-robin selection in the pool so multiple sequentially
|
||||
# initialized statements are assigned to different connections.
|
||||
end
|
||||
|
@ -30,40 +30,48 @@ module DB
|
|||
|
||||
# See `QueryMethods#exec`
|
||||
def exec : ExecResult
|
||||
get_statement.exec
|
||||
statement_with_retry &.exec
|
||||
end
|
||||
|
||||
# See `QueryMethods#exec`
|
||||
def exec(*args) : ExecResult
|
||||
get_statement.exec(*args)
|
||||
statement_with_retry &.exec(*args)
|
||||
end
|
||||
|
||||
# See `QueryMethods#exec`
|
||||
def exec(args : Array) : ExecResult
|
||||
get_statement.exec(args)
|
||||
statement_with_retry &.exec(args)
|
||||
end
|
||||
|
||||
# See `QueryMethods#query`
|
||||
def query : ResultSet
|
||||
get_statement.query
|
||||
statement_with_retry &.query
|
||||
end
|
||||
|
||||
# See `QueryMethods#query`
|
||||
def query(*args) : ResultSet
|
||||
get_statement.query(*args)
|
||||
statement_with_retry &.query(*args)
|
||||
end
|
||||
|
||||
# See `QueryMethods#query`
|
||||
def query(args : Array) : ResultSet
|
||||
get_statement.query(args)
|
||||
statement_with_retry &.query(args)
|
||||
end
|
||||
|
||||
# builds a statement over a real connection
|
||||
# the conneciton is registered in `@connections`
|
||||
private def get_statement : Statement
|
||||
private def build_statement
|
||||
# TODO closed connections should be removed from @connections
|
||||
# either by callbacks or by week references.
|
||||
conn, existing = @db.checkout_some(@connections)
|
||||
@connections << conn unless existing
|
||||
conn.prepare(@query)
|
||||
end
|
||||
|
||||
private def statement_with_retry
|
||||
@db.retry do
|
||||
return yield build_statement
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue