diff --git a/spec/pool_spec.cr b/spec/pool_spec.cr new file mode 100644 index 0000000..ebaee5e --- /dev/null +++ b/spec/pool_spec.cr @@ -0,0 +1,166 @@ +require "./spec_helper" + +class ShouldSleepingOp + @is_sleeping = false + getter is_sleeping + getter sleep_happened + + def initialize + @sleep_happened = Channel(Nil).new + end + + def should_sleep + s = self + @is_sleeping = true + spawn do + sleep 0.1 + s.is_sleeping.should be_true + s.sleep_happened.send(nil) + end + yield + @is_sleeping = false + end + + def wait_for_sleep + @sleep_happened.receive + end +end + +class WaitFor + def initialize + @channel = Channel(Nil).new + end + + def wait + @channel.receive + end + + def check + @channel.send(nil) + end +end + +class Closable + include DB::Disposable + + protected def do_close + end +end + +describe DB::Pool do + it "should use proc to create objects" do + block_called = 0 + pool = DB::Pool.new(->{ block_called += 1; Closable.new }, initial_pool_size: 3) + block_called.should eq(3) + end + + it "should get resource" do + pool = DB::Pool.new(->{ Closable.new }) + pool.checkout.should be_a Closable + end + + it "should wait for available resource" do + pool = DB::Pool.new(->{ Closable.new }, max_pool_size: 1, initial_pool_size: 1) + + b_cnn_request = ShouldSleepingOp.new + wait_a = WaitFor.new + wait_b = WaitFor.new + + spawn do + a_cnn = pool.checkout + b_cnn_request.wait_for_sleep + pool.release a_cnn + + wait_a.check + end + + spawn do + b_cnn_request.should_sleep do + pool.checkout + end + + wait_b.check + end + + wait_a.wait + wait_b.wait + end + + it "should create new if max was not reached" do + block_called = 0 + pool = DB::Pool.new(->{ block_called += 1; Closable.new }, max_pool_size: 2, initial_pool_size: 1) + block_called.should eq 1 + pool.checkout + block_called.should eq 1 + pool.checkout + block_called.should eq 2 + end + + it "should reuse returned resources" do + all = [] of Closable + pool = DB::Pool.new(->{ Closable.new.tap { |c| all << c } }, max_pool_size: 2, initial_pool_size: 1) + pool.checkout + b1 = pool.checkout + pool.release b1 + b2 = pool.checkout + + b1.should eq b2 + all.size.should eq 2 + end + + it "should close available and total" do + all = [] of Closable + pool = DB::Pool.new(->{ Closable.new.tap { |c| all << c } }, max_pool_size: 2, initial_pool_size: 1) + a = pool.checkout + b = pool.checkout + pool.release b + all.size.should eq 2 + + all[0].closed?.should be_false + all[1].closed?.should be_false + pool.close + all[0].closed?.should be_true + all[1].closed?.should be_true + end + + it "should timeout" do + pool = DB::Pool.new(->{ Closable.new }, max_pool_size: 1, checkout_timeout: 0.1) + pool.checkout + expect_raises DB::PoolTimeout do + pool.checkout + end + end + + it "should close if max idle amount is reached" do + all = [] of Closable + pool = DB::Pool.new(->{ Closable.new.tap { |c| all << c } }, max_pool_size: 3, max_idle_pool_size: 1) + pool.checkout + pool.checkout + pool.checkout + + all.size.should eq 3 + all.any?(&.closed?).should be_false + pool.release all[0] + + all.any?(&.closed?).should be_false + pool.release all[1] + + all[0].closed?.should be_false + all[1].closed?.should be_true + all[2].closed?.should be_false + end + + it "should create resource after max_pool was reached if idle forced some close up" do + all = [] of Closable + pool = DB::Pool.new(->{ Closable.new.tap { |c| all << c } }, max_pool_size: 3, max_idle_pool_size: 1) + pool.checkout + pool.checkout + pool.checkout + pool.release all[0] + pool.release all[1] + pool.checkout + pool.checkout + + all.size.should eq 4 + end +end diff --git a/src/db.cr b/src/db.cr index 3541099..210176f 100644 --- a/src/db.cr +++ b/src/db.cr @@ -119,6 +119,7 @@ module DB end end +require "./db/pool" require "./db/query_methods" require "./db/disposable" require "./db/database" diff --git a/src/db/error.cr b/src/db/error.cr index 7fb85af..eb1fdd4 100644 --- a/src/db/error.cr +++ b/src/db/error.cr @@ -1,9 +1,10 @@ module DB - class Error < Exception end class MappingException < Exception end - + + class PoolTimeout < Error + end end diff --git a/src/db/pool.cr b/src/db/pool.cr new file mode 100644 index 0000000..a259ccd --- /dev/null +++ b/src/db/pool.cr @@ -0,0 +1,115 @@ +module DB + class Pool(T) + @initial_pool_size : Int32 + # maximum amount of objects in the pool. Either available or in use. + @max_pool_size : Int32 + @available = Set(T).new + @total = [] of T + @checkout_timeout : Float64 + + def initialize(@factory : Proc(T), @initial_pool_size = 1, @max_pool_size = 1, @max_idle_pool_size = 1, @checkout_timeout = 5.0) + @initial_pool_size.times { build_resource } + + @availability_channel = Channel(Nil).new + @waiting_resource = 0 + end + + # close all resources in the pool + def close : Nil + @total.each &.close + end + + def checkout : T + resource = if @available.empty? + if can_increase_pool + build_resource + else + wait_for_available + pick_available + end + else + pick_available + end + + @available.delete resource + resource + end + + def release(resource : T) : Nil + if can_increase_idle_pool + @available << resource + @availability_channel.send nil if @waiting_resource > 0 + else + resource.close + @total.delete(resource) + end + end + + private def build_resource : T + resource = @factory.call + @total << resource + @available << resource + resource + end + + private def can_increase_pool + @total.size < @max_pool_size + end + + private def can_increase_idle_pool + @available.size < @max_idle_pool_size + end + + private def pick_available + @available.first + end + + private def wait_for_available + timeout = TimeoutHelper.new(@checkout_timeout.to_f64, ->{ @availability_channel.send nil }) + @waiting_resource += 1 + + timeout.start + # if there are no available resources, sleep until one is available + @availability_channel.receive + timeout.raise_if_reached + + # double check there is something available to be checkedout + while @available.empty? + @availability_channel.receive + timeout.raise_if_reached + end + + timeout.cancel + @waiting_resource -= 1 + end + + class TimeoutHelper + def initialize(@timeout : Float64, @tick : Proc(Nil)) + @abort_timeout = false + @should_timeout = false + end + + def start + spawn do + sleep @timeout + unless @abort_timeout + @should_timeout = true + @tick.call + end + end + end + + def cancel + @abort_timeout = true + end + + def timeout_reached? + @should_timeout + end + + def raise_if_reached + raise DB::PoolTimeout.new if timeout_reached? + end + end + end +end