mirror of
https://gitea.invidious.io/iv-org/shard-crystal-db.git
synced 2024-08-15 00:53:32 +00:00
resource pool implementation with
* max_pool_size * initial_pool_size * max_idle_pool_size * checkout_timeout configuration options
This commit is contained in:
parent
8a913d1ef2
commit
421996b952
4 changed files with 285 additions and 2 deletions
166
spec/pool_spec.cr
Normal file
166
spec/pool_spec.cr
Normal file
|
@ -0,0 +1,166 @@
|
||||||
|
require "./spec_helper"
|
||||||
|
|
||||||
|
class ShouldSleepingOp
|
||||||
|
@is_sleeping = false
|
||||||
|
getter is_sleeping
|
||||||
|
getter sleep_happened
|
||||||
|
|
||||||
|
def initialize
|
||||||
|
@sleep_happened = Channel(Nil).new
|
||||||
|
end
|
||||||
|
|
||||||
|
def should_sleep
|
||||||
|
s = self
|
||||||
|
@is_sleeping = true
|
||||||
|
spawn do
|
||||||
|
sleep 0.1
|
||||||
|
s.is_sleeping.should be_true
|
||||||
|
s.sleep_happened.send(nil)
|
||||||
|
end
|
||||||
|
yield
|
||||||
|
@is_sleeping = false
|
||||||
|
end
|
||||||
|
|
||||||
|
def wait_for_sleep
|
||||||
|
@sleep_happened.receive
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class WaitFor
|
||||||
|
def initialize
|
||||||
|
@channel = Channel(Nil).new
|
||||||
|
end
|
||||||
|
|
||||||
|
def wait
|
||||||
|
@channel.receive
|
||||||
|
end
|
||||||
|
|
||||||
|
def check
|
||||||
|
@channel.send(nil)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class Closable
|
||||||
|
include DB::Disposable
|
||||||
|
|
||||||
|
protected def do_close
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe DB::Pool do
|
||||||
|
it "should use proc to create objects" do
|
||||||
|
block_called = 0
|
||||||
|
pool = DB::Pool.new(->{ block_called += 1; Closable.new }, initial_pool_size: 3)
|
||||||
|
block_called.should eq(3)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should get resource" do
|
||||||
|
pool = DB::Pool.new(->{ Closable.new })
|
||||||
|
pool.checkout.should be_a Closable
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should wait for available resource" do
|
||||||
|
pool = DB::Pool.new(->{ Closable.new }, max_pool_size: 1, initial_pool_size: 1)
|
||||||
|
|
||||||
|
b_cnn_request = ShouldSleepingOp.new
|
||||||
|
wait_a = WaitFor.new
|
||||||
|
wait_b = WaitFor.new
|
||||||
|
|
||||||
|
spawn do
|
||||||
|
a_cnn = pool.checkout
|
||||||
|
b_cnn_request.wait_for_sleep
|
||||||
|
pool.release a_cnn
|
||||||
|
|
||||||
|
wait_a.check
|
||||||
|
end
|
||||||
|
|
||||||
|
spawn do
|
||||||
|
b_cnn_request.should_sleep do
|
||||||
|
pool.checkout
|
||||||
|
end
|
||||||
|
|
||||||
|
wait_b.check
|
||||||
|
end
|
||||||
|
|
||||||
|
wait_a.wait
|
||||||
|
wait_b.wait
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should create new if max was not reached" do
|
||||||
|
block_called = 0
|
||||||
|
pool = DB::Pool.new(->{ block_called += 1; Closable.new }, max_pool_size: 2, initial_pool_size: 1)
|
||||||
|
block_called.should eq 1
|
||||||
|
pool.checkout
|
||||||
|
block_called.should eq 1
|
||||||
|
pool.checkout
|
||||||
|
block_called.should eq 2
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should reuse returned resources" do
|
||||||
|
all = [] of Closable
|
||||||
|
pool = DB::Pool.new(->{ Closable.new.tap { |c| all << c } }, max_pool_size: 2, initial_pool_size: 1)
|
||||||
|
pool.checkout
|
||||||
|
b1 = pool.checkout
|
||||||
|
pool.release b1
|
||||||
|
b2 = pool.checkout
|
||||||
|
|
||||||
|
b1.should eq b2
|
||||||
|
all.size.should eq 2
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should close available and total" do
|
||||||
|
all = [] of Closable
|
||||||
|
pool = DB::Pool.new(->{ Closable.new.tap { |c| all << c } }, max_pool_size: 2, initial_pool_size: 1)
|
||||||
|
a = pool.checkout
|
||||||
|
b = pool.checkout
|
||||||
|
pool.release b
|
||||||
|
all.size.should eq 2
|
||||||
|
|
||||||
|
all[0].closed?.should be_false
|
||||||
|
all[1].closed?.should be_false
|
||||||
|
pool.close
|
||||||
|
all[0].closed?.should be_true
|
||||||
|
all[1].closed?.should be_true
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should timeout" do
|
||||||
|
pool = DB::Pool.new(->{ Closable.new }, max_pool_size: 1, checkout_timeout: 0.1)
|
||||||
|
pool.checkout
|
||||||
|
expect_raises DB::PoolTimeout do
|
||||||
|
pool.checkout
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should close if max idle amount is reached" do
|
||||||
|
all = [] of Closable
|
||||||
|
pool = DB::Pool.new(->{ Closable.new.tap { |c| all << c } }, max_pool_size: 3, max_idle_pool_size: 1)
|
||||||
|
pool.checkout
|
||||||
|
pool.checkout
|
||||||
|
pool.checkout
|
||||||
|
|
||||||
|
all.size.should eq 3
|
||||||
|
all.any?(&.closed?).should be_false
|
||||||
|
pool.release all[0]
|
||||||
|
|
||||||
|
all.any?(&.closed?).should be_false
|
||||||
|
pool.release all[1]
|
||||||
|
|
||||||
|
all[0].closed?.should be_false
|
||||||
|
all[1].closed?.should be_true
|
||||||
|
all[2].closed?.should be_false
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should create resource after max_pool was reached if idle forced some close up" do
|
||||||
|
all = [] of Closable
|
||||||
|
pool = DB::Pool.new(->{ Closable.new.tap { |c| all << c } }, max_pool_size: 3, max_idle_pool_size: 1)
|
||||||
|
pool.checkout
|
||||||
|
pool.checkout
|
||||||
|
pool.checkout
|
||||||
|
pool.release all[0]
|
||||||
|
pool.release all[1]
|
||||||
|
pool.checkout
|
||||||
|
pool.checkout
|
||||||
|
|
||||||
|
all.size.should eq 4
|
||||||
|
end
|
||||||
|
end
|
|
@ -119,6 +119,7 @@ module DB
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
require "./db/pool"
|
||||||
require "./db/query_methods"
|
require "./db/query_methods"
|
||||||
require "./db/disposable"
|
require "./db/disposable"
|
||||||
require "./db/database"
|
require "./db/database"
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
module DB
|
module DB
|
||||||
|
|
||||||
class Error < Exception
|
class Error < Exception
|
||||||
end
|
end
|
||||||
|
|
||||||
class MappingException < Exception
|
class MappingException < Exception
|
||||||
end
|
end
|
||||||
|
|
||||||
|
class PoolTimeout < Error
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
115
src/db/pool.cr
Normal file
115
src/db/pool.cr
Normal file
|
@ -0,0 +1,115 @@
|
||||||
|
module DB
|
||||||
|
class Pool(T)
|
||||||
|
@initial_pool_size : Int32
|
||||||
|
# maximum amount of objects in the pool. Either available or in use.
|
||||||
|
@max_pool_size : Int32
|
||||||
|
@available = Set(T).new
|
||||||
|
@total = [] of T
|
||||||
|
@checkout_timeout : Float64
|
||||||
|
|
||||||
|
def initialize(@factory : Proc(T), @initial_pool_size = 1, @max_pool_size = 1, @max_idle_pool_size = 1, @checkout_timeout = 5.0)
|
||||||
|
@initial_pool_size.times { build_resource }
|
||||||
|
|
||||||
|
@availability_channel = Channel(Nil).new
|
||||||
|
@waiting_resource = 0
|
||||||
|
end
|
||||||
|
|
||||||
|
# close all resources in the pool
|
||||||
|
def close : Nil
|
||||||
|
@total.each &.close
|
||||||
|
end
|
||||||
|
|
||||||
|
def checkout : T
|
||||||
|
resource = if @available.empty?
|
||||||
|
if can_increase_pool
|
||||||
|
build_resource
|
||||||
|
else
|
||||||
|
wait_for_available
|
||||||
|
pick_available
|
||||||
|
end
|
||||||
|
else
|
||||||
|
pick_available
|
||||||
|
end
|
||||||
|
|
||||||
|
@available.delete resource
|
||||||
|
resource
|
||||||
|
end
|
||||||
|
|
||||||
|
def release(resource : T) : Nil
|
||||||
|
if can_increase_idle_pool
|
||||||
|
@available << resource
|
||||||
|
@availability_channel.send nil if @waiting_resource > 0
|
||||||
|
else
|
||||||
|
resource.close
|
||||||
|
@total.delete(resource)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
private def build_resource : T
|
||||||
|
resource = @factory.call
|
||||||
|
@total << resource
|
||||||
|
@available << resource
|
||||||
|
resource
|
||||||
|
end
|
||||||
|
|
||||||
|
private def can_increase_pool
|
||||||
|
@total.size < @max_pool_size
|
||||||
|
end
|
||||||
|
|
||||||
|
private def can_increase_idle_pool
|
||||||
|
@available.size < @max_idle_pool_size
|
||||||
|
end
|
||||||
|
|
||||||
|
private def pick_available
|
||||||
|
@available.first
|
||||||
|
end
|
||||||
|
|
||||||
|
private def wait_for_available
|
||||||
|
timeout = TimeoutHelper.new(@checkout_timeout.to_f64, ->{ @availability_channel.send nil })
|
||||||
|
@waiting_resource += 1
|
||||||
|
|
||||||
|
timeout.start
|
||||||
|
# if there are no available resources, sleep until one is available
|
||||||
|
@availability_channel.receive
|
||||||
|
timeout.raise_if_reached
|
||||||
|
|
||||||
|
# double check there is something available to be checkedout
|
||||||
|
while @available.empty?
|
||||||
|
@availability_channel.receive
|
||||||
|
timeout.raise_if_reached
|
||||||
|
end
|
||||||
|
|
||||||
|
timeout.cancel
|
||||||
|
@waiting_resource -= 1
|
||||||
|
end
|
||||||
|
|
||||||
|
class TimeoutHelper
|
||||||
|
def initialize(@timeout : Float64, @tick : Proc(Nil))
|
||||||
|
@abort_timeout = false
|
||||||
|
@should_timeout = false
|
||||||
|
end
|
||||||
|
|
||||||
|
def start
|
||||||
|
spawn do
|
||||||
|
sleep @timeout
|
||||||
|
unless @abort_timeout
|
||||||
|
@should_timeout = true
|
||||||
|
@tick.call
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def cancel
|
||||||
|
@abort_timeout = true
|
||||||
|
end
|
||||||
|
|
||||||
|
def timeout_reached?
|
||||||
|
@should_timeout
|
||||||
|
end
|
||||||
|
|
||||||
|
def raise_if_reached
|
||||||
|
raise DB::PoolTimeout.new if timeout_reached?
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in a new issue