Compare commits

..

50 commits

Author SHA1 Message Date
Lawrence Wakefield
532ae075bd
Allow Enumerable for query args instead of Array (#207) 2024-07-03 15:46:40 +02:00
Brian J. Cardiff
3eaac85a5d
Release 0.13.1 (#203) 2023-12-21 09:53:14 -03:00
Brian J. Cardiff
1d0105ffeb
Gracefully allow spec helper to fail on older crystal (#202) 2023-12-17 20:03:45 -03:00
Brian J. Cardiff
26599a740f
Update CHANGELOG.md 2023-12-11 19:31:47 -03:00
Brian J. Cardiff
7fff589e02
Release 0.13.0 (#201) 2023-12-11 19:11:43 -03:00
Brian J. Cardiff
c106775ea9
Simplifications and performance improvements (#200)
* Add pool_concurrency_test manual spec

Add MT connection count without Mutex

* Drop checkout_some, simpler pool_prepared statement

* Make pool statement a struct

* Drop StringKeyCache mutex

The StringKeyCache is now only used inside a connection. It's assumed that connections are not used concurrently with multiple queries.

* Drop do_close in pool statements

* Add specs and update comment

* Fix typo
2023-12-08 19:06:41 -03:00
Lachlan Dowding
06df272740
Add exception cause support to PoolResourceLost and ConnectionLost (#199)
constructors
2023-11-30 09:36:00 -03:00
Brian J. Cardiff
d3dd978e24
Allow statements to auto close when consumed if no cache (#198) 2023-11-29 18:39:44 -03:00
Johannes Müller
76d8bb6a6e
Deprecate DB.mapping (#196) 2023-11-13 17:44:39 +01:00
Johannes Müller
340b6e4b9a
Add reference to DB::Serializable in docs (#197) 2023-11-09 08:22:51 -03:00
Brian J. Cardiff
285e865e3a
Allow prepared_statements_cache=false option to disable prepared statements cache (#194)
* Add prepared_statements_cache in connection to opt-out

* Honor prepared_statements_cache option in database also
2023-11-03 23:04:14 -03:00
wonderix
9b52a65752 Add link to crystal-tds 2023-11-02 18:45:45 -03:00
Lachlan Dowding
a527cfdc4e
Fix DB::DriverSpecs#with_db connection_string query param support (#192) 2023-10-31 10:10:25 -03:00
Brian J. Cardiff
38faf7eeba
Update docs regarding ConnectionBuilder (#188) 2023-08-14 21:11:35 -03:00
Brian J. Cardiff
9471b33ffe
Fix max_idle_pool_size race condition (#186)
* Add Fiber.yield to dummy driver to mimic real drivers IO

* Add manual load test file

* Fix race condition

* Drop unused code

* Less state, less bugs

* Update spec/manual/load_test.cr

Co-authored-by: Beta Ziliani <beta@manas.tech>

---------

Co-authored-by: Beta Ziliani <beta@manas.tech>
2023-07-31 11:04:18 -03:00
Jamie Gaskins
ce95cd2257
Decrement the inflight counter on ConnectionRefused (#184) 2023-07-10 10:55:35 -03:00
Brian J. Cardiff
851091e81c
Release 0.12.0 (#183)
* Release 0.12.0

* Update CHANGELOG.md

Co-authored-by: Johannes Müller <straightshoota@gmail.com>

---------

Co-authored-by: Johannes Müller <straightshoota@gmail.com>
2023-06-23 13:48:41 -03:00
Brian J. Cardiff
f13846b133
Refactor connection factory (#181)
* Start moving out URI from ConnectionContext

Create connections with an initial context. Database will set itself as context after connection has been created

* Migrate to simpler/decoupled factory in driver

This allows more freedom on how the connection is created. It will no longer need to have an explicit reference to the connection URI

* Introduce DB::Connection::Options

Move prepared_statements out from ConnectionContext

* Delegate options parsing to driver

DRY parsing connection options for database

* Introduce DB::Pool::Options

* Rename Driver#connection_pool_options to pool_options

* Drop driver getter from database

* Drop uri getter from database

* Add public Database#initialize method

* Drop :nodoc: Database#initialize

* Pass spec helper explicitly (to access methods within each spec)

* Update docs

* Update src/db/pool.cr

Co-authored-by: Beta Ziliani <beta@manas.tech>

* Use ConnectionBuilder instead of procs

* Fix inferred type when there is a single concrete connection type

* Update src/db/driver.cr

Co-authored-by: Beta Ziliani <beta@manas.tech>

---------

Co-authored-by: Beta Ziliani <beta@manas.tech>
2023-06-22 22:03:08 -03:00
Jamie Gaskins
65b926c926
Use https link for API docs (#180) 2023-05-29 17:47:21 -03:00
Brian J. Cardiff
da7494b5ba
Fix mt issues (#178) 2023-04-24 12:26:25 +02:00
Amaury Trujillo
87dc8aafaf
Add DuckDB driver to README (#172) 2022-10-27 18:36:46 +02:00
Jamie Gaskins
e076a08cd0
Close a transaction when returning from within its block (#167) 2022-10-27 18:35:57 +02:00
Jamie Gaskins
167b55966e
Allow the use of enums (#168) 2022-10-26 16:06:14 +02:00
Johannes Müller
07c68d38e4
Fix positional parameter warnings (#173) 2022-10-20 20:48:21 +02:00
Beta Ziliani
3e9ed7a304
Merge pull request #163 from crystal-lang/fix/crystal-1.4 2022-03-25 14:59:20 -03:00
Johannes Müller
c8a0849423
Fix specs for Crystal 1.4.0
The format of string conversion error messages was changed in
https://github.com/crystal-lang/crystal#11883
2022-03-23 21:25:21 +01:00
Brian J. Cardiff
e3f1a308b4
Release 0.11.0 (#157) 2022-01-27 10:56:35 -03:00
Brian J. Cardiff
27ade07359
Add workaround for crystal-lang/crystal#9483 (#160)
* Add workaround for crystal-lang/crystal#9483

* Update src/db/begin_transaction.cr

Co-authored-by: Johannes Müller <straightshoota@gmail.com>

Co-authored-by: Johannes Müller <straightshoota@gmail.com>
2022-01-22 15:06:16 -03:00
Brian J. Cardiff
d829b07b01
Returns block's value on #transaction method (#159)
* Returns block's value on #transaction method

* Update src/db/begin_transaction.cr

Co-authored-by: Johannes Müller <straightshoota@gmail.com>

Co-authored-by: Johannes Müller <straightshoota@gmail.com>
2022-01-21 10:57:14 -03:00
Johannes Müller
5a7d27e0c5
Improve DB::MappingException usage (#129) 2021-10-12 20:51:53 -03:00
Jeremy Woertink
b1299fcada
Raise a specific class error instead of string literal (#156)
* Raise a specific class error instead of string literal when the type returned doesn't match the type expected. Allows for drivers to catch the specific error.

* Add ResultSet#next_column_index

* Add shared specs for next_column_index

* Add properties to ColumnTypeMismatchError

* Add shared specs for ColumnTypeMismatchError

* Fix specs

Co-authored-by: Brian J. Cardiff <bcardiff@gmail.com>
2021-10-12 20:49:26 -03:00
Stephen von Takach
a25f33611c
feat(error): close resources on resource lost (#155) 2021-09-10 08:36:01 -03:00
Stephen von Takach
6dc3f2dd6f
fix(pool): returning closed resources to the pool (#154) 2021-09-06 19:02:43 -03:00
Duke Nguyen
bf5ca75d1a
Fix model.from_rs argument type typo (#142) 2021-08-05 00:12:59 -03:00
Johannes Müller
3a53a69f83
Update Github Actions config (#152)
Add explicit 1.0.0 to version matrix.

Formatter check should only run on latest.
2021-06-21 16:53:14 +02:00
Oleh Prypin
52bd5b0a86
Migrate continuous testing to GitHub Actions (#147)
* crystal tool format

* Migrate continuous testing to GitHub Actions
2021-06-08 20:52:00 -03:00
Brian J. Cardiff
0415deebbb
Release 0.10.1 (#149) 2021-03-22 17:18:00 -03:00
Jamie Gaskins
eaddae7d71
Add docs for DB::Database#setup_connection (#139) 2020-10-27 12:55:06 -03:00
Brian J. Cardiff
bd45602864
Release 0.10.0 (#136) 2020-09-30 10:37:37 -03:00
Brian J. Cardiff
284145138f
Bump required Crystal to 0.35.0 (#135)
In order to use logging we need 0.35.0
2020-09-29 10:35:14 -03:00
Brian J. Cardiff
7253551849
Add logging for executing queries (#134)
* Add logging for executing queries

Arguments are translated to Log::Metadata::Value via DB::Statement#arg_to_log method.

DB::Statement#before_query_or_exec & after_query_or_exec protected methods can be used to hook and run around the statement execution

* Move the metadata converter to a module

* Replace before/after with def_around_query_or_exec macro

* Update src/db/enumerable_concat.cr

Co-authored-by: Ary Borenszweig <asterite@gmail.com>

Co-authored-by: Ary Borenszweig <asterite@gmail.com>
2020-09-25 14:49:50 -03:00
Johannes Müller
fad9e70353
Fix mutex deadlock in setup_connection (#128)
Adds auto_release = false in setup_connection to avoid trying to release
the connection to the pool before it has been added.
2020-09-14 10:55:18 -03:00
Jamie Gaskins
291b65b853
Allow DB::Pool to be used a generic connection pool (#131)
* Allow DB::Pool to be a generic connection pool

* Use fully qualified class name for consistency

Co-authored-by: Brian J. Cardiff <bcardiff@gmail.com>

* Wrap only the necessary code in an `ensure`

* Add spec for http client pool

* Fix ICE in crystal-sqlite3

Co-authored-by: Brian J. Cardiff <bcardiff@gmail.com>
2020-09-14 10:49:00 -03:00
Brian J. Cardiff
ed686ad301
Release 0.9.0 (#126) 2020-04-06 20:11:41 -03:00
Brian J. Cardiff
511fe20253
Use DB::NoResultsError on QueryMethods (#125) 2020-04-06 18:36:37 -03:00
Brian J. Cardiff
7543908733
Merge pull request #124 from crystal-lang/crystal/0.34.0
Crystal 0.34.0 support
2020-04-06 18:14:40 -03:00
Brian J. Cardiff
f4b298d3a5 Use select / timeout in Crystal 0.34 2020-04-02 12:34:27 -03:00
Brian J. Cardiff
af200eac54 Comply exhaustive case check
Comply exhaustive case check
2020-04-02 12:34:27 -03:00
Jeremy Woertink
f08a4da60a
Add NoResultsError (#121) 2020-02-17 22:04:23 +01:00
Brian J. Cardiff
f128d6ddf3
Update README.md and docs for dynamic args (#117) 2020-01-14 10:29:54 -03:00
40 changed files with 1336 additions and 358 deletions

33
.github/workflows/ci.yml vendored Normal file
View file

@ -0,0 +1,33 @@
name: CI
on:
push:
pull_request:
branches: [master]
schedule:
- cron: '0 6 * * 1' # Every monday 6 AM
jobs:
test:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest]
crystal: [1.0.0, latest, nightly]
runs-on: ${{ matrix.os }}
steps:
- name: Install Crystal
uses: oprypin/install-crystal@v1
with:
crystal: ${{ matrix.crystal }}
- name: Download source
uses: actions/checkout@v2
- name: Run specs
run: crystal spec
- name: Check formatting
run: crystal tool format; git diff --exit-code
if: matrix.crystal == 'latest' && matrix.os == 'ubuntu-latest'

View file

@ -1,7 +0,0 @@
language: crystal
crystal:
- latest
- nightly
script:
- crystal spec
- crystal tool format --check

View file

@ -1,3 +1,68 @@
## v0.13.1 (2023-12-21)
* Gracefully allow spec helper to fail on older crystal. ([#202](https://github.com/crystal-lang/crystal-db/pull/202), thanks @bcardiff)
## v0.13.0 (2023-12-11)
* **(breaking-change)** Deprecate `DB.mapping`. ([#196](https://github.com/crystal-lang/crystal-db/pull/196), thanks @straight-shoota)
* **(breaking-change)** Drop `Pool#checkout_some`, make `PoolStatement` a struct. ([#200](https://github.com/crystal-lang/crystal-db/pull/200), thanks @bcardiff)
* Simplifications and performance improvements on pool statements. ([#200](https://github.com/crystal-lang/crystal-db/pull/200), thanks @bcardiff)
* Allow `prepared_statements_cache=false`` option to disable prepared statements cache. ([#194](https://github.com/crystal-lang/crystal-db/pull/194), [#198](https://github.com/crystal-lang/crystal-db/pull/198), thanks @bcardiff)
* Add exception cause support to `PoolResourceLost` and `ConnectionLost` constructors. ([#199](https://github.com/crystal-lang/crystal-db/pull/199), thanks @lachlan)
* Fix inflight counter on `ConnectionRefused`. ([#184](https://github.com/crystal-lang/crystal-db/pull/184), thanks @jgaskins)
* Fix max_idle_pool_size race condition. ([#186](https://github.com/crystal-lang/crystal-db/pull/186), thanks @bcardiff)
* Fix `DB::DriverSpecs#with_db` `connection_string` query param support. ([#192](https://github.com/crystal-lang/crystal-db/pull/192), thanks @lachlan)
* Update docs regarding `ConnectionBuilder`. ([#188](https://github.com/crystal-lang/crystal-db/pull/188), thanks @bcardiff)
* Add reference to `DB::Serializable` in docs. ([#197](https://github.com/crystal-lang/crystal-db/pull/197), thanks @straight-shoota)
* Add link to crystal-tds. ([#193](https://github.com/crystal-lang/crystal-db/pull/193), thanks @wonderix)
### Notes for driver implementors
* Use new constructors to preserve the underlying reason of a `PoolResourceLost` or `ConnectionLost` constructors (See [#199](https://github.com/crystal-lang/crystal-db/pull/199))
## v0.12.0 (2023-06-23)
- **(breaking-change)** Refactor how drivers create connections. Allow creating `Database` without `URI`s. ([#181](https://github.com/crystal-lang/crystal-db/pull/181), thanks @bcardiff)
- Close a transaction when `return`ing from within its block. ([#167](https://github.com/crystal-lang/crystal-db/pull/167), thanks @jgaskins)
- Fix race conditions in multi-thread mode. ([#178](https://github.com/crystal-lang/crystal-db/pull/178), thanks @bcardiff)
- Allow the use of `Enum`s when reading a `ResultSet`. ([#168](https://github.com/crystal-lang/crystal-db/pull/168), thanks @jgaskins)
- Fix specs for Crystal 1.4 and 1.5. ([#163](https://github.com/crystal-lang/crystal-db/pull/163), [#173](https://github.com/crystal-lang/crystal-db/pull/173), thanks @straight-shoota)
- Update README. ([#172](https://github.com/crystal-lang/crystal-db/pull/172), [#180](https://github.com/crystal-lang/crystal-db/pull/180), thanks @amauryt, @jgaskins)
Note: The breaking-change introduced in this release does not affect consumers of the library, only driver implementors.
## v0.11.0 (2022-01-27)
* Fix `Connection#transaction` method to return the block value as the result. ([#159](https://github.com/crystal-lang/crystal-db/pull/159), [#160](https://github.com/crystal-lang/crystal-db/pull/160), thanks @bcardiff)
* Add `DB::ColumnTypeMismatchError` error with column and type information. ([#156](https://github.com/crystal-lang/crystal-db/pull/156), thanks @jwoertink, @bcardiff)
* Improve `DB::MappingException` error. ([#129](https://github.com/crystal-lang/crystal-db/pull/129), thanks @straight-shoota)
* Close connection resource when connection is lost. ([#155](https://github.com/crystal-lang/crystal-db/pull/155), thanks @stakach, @bcardiff)
* Discard closed connections in the pool when they are returned. ([#154](https://github.com/crystal-lang/crystal-db/pull/154), thanks @stakach)
* Fix typo in `Mode.from_rs` argument type. ([#142](https://github.com/crystal-lang/crystal-db/pull/142), thanks @dukeraphaelng)
* Migrate CI to GitHub Actions. ([#147](https://github.com/crystal-lang/crystal-db/pull/147), [#152](https://github.com/crystal-lang/crystal-db/pull/152), thanks @oprypin, thanks @straight-shoota)
This release requires Crystal 1.0.0 or later.
Note: For drivers implementations [#156](https://github.com/crystal-lang/crystal-db/pull/156) adds a `abstract def next_column_index : Int32` to `ResultSet` so there is a breaking-change that does not affect consumers of the library.
## v0.10.1 (2021-03-22)
* Add docs for `DB::Database#setup_connection` ([#139](https://github.com/crystal-lang/crystal-db/pull/139), thanks @jgaskins)
## v0.10.0 (2020-09-30)
* Fix mutex deadlock in setup_connection. ([#128](https://github.com/crystal-lang/crystal-db/pull/128), thanks @straight-shoota)
* Add logging for executing queries. ([#134](https://github.com/crystal-lang/crystal-db/pull/134), thanks @bcardiff)
* Allow `DB::Pool` to be used a generic connection pool. ([#131](https://github.com/crystal-lang/crystal-db/pull/131), thanks @jgaskins)
This release requires Crystal 0.35.0 or later.
## v0.9.0 (2020-04-06)
* Fix compatibility issues for Crystal 0.34.0. ([#124](https://github.com/crystal-lang/crystal-db/pull/124), thanks @bcardiff)
* Fix readme sample. ([#117](https://github.com/crystal-lang/crystal-db/pull/117), thanks @bcardiff)
* Raise `DB::NoResultsError` when trying to get data from an empty result ([#121](https://github.com/crystal-lang/crystal-db/pull/121), [#125](https://github.com/crystal-lang/crystal-db/pull/125), thanks @jwoertink, @bcardiff)
## v0.8.0 (2019-12-11)
* Add `DB::Serializable`. ([#115](https://github.com/crystal-lang/crystal-db/pull/115), thanks @nickbclifford)

View file

@ -1,4 +1,4 @@
[![Build Status](https://travis-ci.org/crystal-lang/crystal-db.svg?branch=master)](https://travis-ci.org/crystal-lang/crystal-db)
[![Build Status](https://github.com/crystal-lang/crystal-db/workflows/CI/badge.svg)](https://github.com/crystal-lang/crystal-db/actions?query=workflow%3ACI+event%3Apush+branch%3Amaster)
# crystal-db
@ -7,7 +7,10 @@ Common db api for crystal. You will need to have a specific driver to access a d
* [SQLite](https://github.com/crystal-lang/crystal-sqlite3)
* [MySQL](https://github.com/crystal-lang/crystal-mysql)
* [PostgreSQL](https://github.com/will/crystal-pg)
* [ODBC](https://github.com/naqvis/crystal-odbc)
* [Cassandra](https://github.com/kaukas/crystal-cassandra)
* [DuckDB](https://github.com/amauryt/crystal-duckdb)
* [Microsoft SQL Server](https://github.com/wonderix/crystal-tds)
## Installation
@ -33,7 +36,7 @@ Note: Multiple drivers can be included in the same application.
## Documentation
* [Latest API](http://crystal-lang.github.io/crystal-db/api/latest/)
* [Latest API](https://crystal-lang.github.io/crystal-db/api/latest/)
* [Crystal book](https://crystal-lang.org/docs/database/)
## Usage
@ -54,7 +57,7 @@ DB.open "sqlite3:./file.db" do |db|
args = [] of DB::Any
args << "Sarah"
args << 33
db.exec "insert into contacts values (?, ?)", args
db.exec "insert into contacts values (?, ?)", args: args
puts "max age:"
puts db.scalar "select max(age) from contacts" # => 33
@ -81,7 +84,7 @@ Issues not yet addressed:
- [x] Data type extensibility. Allow each driver to extend the data types allowed.
- [x] Transactions & nested transactions. [#27](https://github.com/crystal-lang/crystal-db/pull/27)
- [x] Connection pool.
- [ ] Logging
- [x] Logging
- [ ] Direct access to `IO` to avoid memory allocation for blobs.
## Contributing

View file

@ -1,9 +1,9 @@
name: db
version: 0.8.0
version: 0.13.1
authors:
- Brian J. Cardiff <bcardiff@manas.tech>
- Brian J. Cardiff <bcardiff@gmail.com>
crystal: 0.25.0
crystal: ">= 1.0.0, < 2.0.0"
license: MIT

View file

@ -20,6 +20,10 @@ module GenericResultSet
@index += 1
@row[@index - 1]
end
def next_column_index : Int32
@index
end
end
class FooValue
@ -32,6 +36,15 @@ class FooValue
end
class FooDriver < DB::Driver
class FooConnectionBuilder < DB::ConnectionBuilder
def initialize(@options : DB::Connection::Options)
end
def build : DB::Connection
FooConnection.new(@options)
end
end
alias Any = DB::Any | FooValue
@@row = [] of Any
@ -43,13 +56,14 @@ class FooDriver < DB::Driver
@@row
end
def build_connection(context : DB::ConnectionContext) : DB::Connection
FooConnection.new(context)
def connection_builder(uri : URI) : DB::ConnectionBuilder
params = HTTP::Params.parse(uri.query || "")
FooConnectionBuilder.new(connection_options(params))
end
class FooConnection < DB::Connection
def build_prepared_statement(query) : DB::Statement
FooStatement.new(self)
FooStatement.new(self, query)
end
def build_unprepared_statement(query) : DB::Statement
@ -95,6 +109,15 @@ class BarValue
end
class BarDriver < DB::Driver
class BarConnectionBuilder < DB::ConnectionBuilder
def initialize(@options : DB::Connection::Options)
end
def build : DB::Connection
BarConnection.new(@options)
end
end
alias Any = DB::Any | BarValue
@@row = [] of Any
@ -106,13 +129,14 @@ class BarDriver < DB::Driver
@@row
end
def build_connection(context : DB::ConnectionContext) : DB::Connection
BarConnection.new(context)
def connection_builder(uri : URI) : DB::ConnectionBuilder
params = HTTP::Params.parse(uri.query || "")
BarConnectionBuilder.new(connection_options(params))
end
class BarConnection < DB::Connection
def build_prepared_statement(query) : DB::Statement
BarStatement.new(self)
BarStatement.new(self, query)
end
def build_unprepared_statement(query) : DB::Statement
@ -152,8 +176,8 @@ DB.register_driver "bar", BarDriver
describe DB do
it "should be able to register multiple drivers" do
DB.open("foo://host").driver.should be_a(FooDriver)
DB.open("bar://host").driver.should be_a(BarDriver)
DB.open("foo://host").checkout.should be_a(FooDriver::FooConnection)
DB.open("bar://host").checkout.should be_a(BarDriver::BarConnection)
end
it "Foo and Bar drivers should return fake_row" do
@ -197,7 +221,7 @@ describe DB do
FooDriver.fake_row = [1] of FooDriver::Any
db.query "query" do |rs|
rs.move_next
expect_raises(Exception, "FooResultSet#read returned a Int32. A BarValue was expected.") do
expect_raises(DB::ColumnTypeMismatchError, "In FooDriver::FooResultSet#read the column 0 returned a Int32 but a BarValue was expected.") do
w.check
rs.read(BarValue)
end
@ -210,7 +234,7 @@ describe DB do
BarDriver.fake_row = [1] of BarDriver::Any
db.query "query" do |rs|
rs.move_next
expect_raises(Exception, "BarResultSet#read returned a Int32. A FooValue was expected.") do
expect_raises(DB::ColumnTypeMismatchError, "In BarDriver::BarResultSet#read the column 0 returned a Int32 but a FooValue was expected.") do
w.check
rs.read(FooValue)
end

View file

@ -8,6 +8,7 @@ describe DB::Database do
db.setup_connection do |cnn|
cnn_setup += 1
cnn.scalar("a").should eq "a"
end
cnn_setup.should eq(2)
@ -56,14 +57,6 @@ describe DB::Database do
end
end
it "should close pool statements when closing db" do
stmt = uninitialized DB::PoolStatement
with_dummy do |db|
stmt = db.build("query1")
end
stmt.closed?.should be_true
end
it "should not reconnect if connection is lost and retry_attempts=0" do
DummyDriver::DummyConnection.clear_connections
DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=0" do |db|
@ -171,6 +164,40 @@ describe DB::Database do
end
end
it "should close connection on ConnectionLost" do
DummyDriver::DummyConnection.clear_connections
DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=1" do |db|
db.exec("stmt1")
DummyDriver::DummyConnection.connections.size.should eq(1)
connection = DummyDriver::DummyConnection.connections.first
connection.disconnect!
connection.closed?.should be_false
db.exec("stmt1")
# A new connection was used for the last statement
DummyDriver::DummyConnection.connections.size.should eq(2)
connection.closed?.should be_true
end
end
it "should not checkout multiple connections if there is a statement error" do
with_dummy "dummy://localhost:1027?initial_pool_size=1&max_pool_size=10&retry_attempts=10" do |db|
expect_raises DB::Error do
db.exec("syntax error")
end
DummyDriver::DummyConnection.connections.size.should eq(1)
end
end
it "should attempt all retries if connection is lost" do
with_dummy "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=10" do |db|
expect_raises DB::PoolRetryAttemptsExceeded do
db.exec("raise ConnectionLost")
end
# 1 initial + 10 retries
DummyDriver::DummyConnection.connections.size.should eq(11)
end
end
describe "prepared_statements connection option" do
it "defaults to true" do
with_dummy "dummy://localhost:1027" do |db|

View file

@ -9,12 +9,9 @@ describe DB do
DB.driver_class("dummy").should eq(DummyDriver)
end
it "should instantiate driver with connection uri" do
it "should create dummy connection" do
db = DB.open "dummy://localhost:1027"
db.driver.should be_a(DummyDriver)
db.uri.scheme.should eq("dummy")
db.uri.host.should eq("localhost")
db.uri.port.should eq(1027)
db.checkout.should be_a(DummyDriver::DummyConnection)
end
it "should create a connection and close it" do

View file

@ -1,36 +1,69 @@
require "spec"
require "../src/db"
class DummyDriver < DB::Driver
def build_connection(context : DB::ConnectionContext) : DB::Connection
DummyConnection.new(context)
class DummyConnectionBuilder < DB::ConnectionBuilder
def initialize(@options : DB::Connection::Options)
end
def build : DB::Connection
DummyConnection.new(@options)
end
end
def connection_builder(uri : URI) : DB::ConnectionBuilder
params = HTTP::Params.parse(uri.query || "")
DummyConnectionBuilder.new(connection_options(params))
end
class DummyConnection < DB::Connection
def initialize(context)
super(context)
@@connections = [] of DummyConnection
@@connections_count = Atomic(Int32).new(0)
def initialize(options : DB::Connection::Options)
super(options)
Fiber.yield
@@connections_count.add(1)
@connected = true
@@connections ||= [] of DummyConnection
@@connections.not_nil! << self
{% unless flag?(:preview_mt) %}
# @@connections is only used in single-threaded mode in specs
# for benchmarks we want to avoid the overhead of synchronizing this array
@@connections << self
{% end %}
end
def self.connections_count
@@connections_count.get
end
def self.connections
@@connections.not_nil!
{% if flag?(:preview_mt) %}
raise "DummyConnection.connections is only available in single-threaded mode"
{% end %}
@@connections
end
def self.clear_connections
@@connections.try &.clear
{% if flag?(:preview_mt) %}
raise "DummyConnection.clear_connections is only available in single-threaded mode"
{% end %}
@@connections.clear
end
def build_prepared_statement(query) : DB::Statement
assert_not_closed!
DummyStatement.new(self, query, true)
end
def build_unprepared_statement(query) : DB::Statement
assert_not_closed!
DummyStatement.new(self, query, false)
end
def last_insert_id : Int64
assert_not_closed!
0
end
@ -43,12 +76,18 @@ class DummyDriver < DB::Driver
end
def create_transaction
assert_not_closed!
DummyTransaction.new(self)
end
protected def do_close
super
end
private def assert_not_closed!
raise "Statement is closed" if closed?
end
end
class DummyTransaction < DB::TopLevelTransaction
@ -94,24 +133,45 @@ class DummyDriver < DB::Driver
end
class DummyStatement < DB::Statement
@@statements_count = Atomic(Int32).new(0)
@@statements_exec_count = Atomic(Int32).new(0)
property params
def initialize(connection, @query : String, @prepared : Bool)
def initialize(connection, command : String, @prepared : Bool)
@params = Hash(Int32 | String, DB::Any | Array(DB::Any)).new
super(connection)
raise DB::Error.new(query) if query == "syntax error"
super(connection, command)
@@statements_count.add(1)
raise DB::Error.new(command) if command == "syntax error"
raise DB::ConnectionLost.new(connection) if command == "raise ConnectionLost"
end
def self.statements_count
@@statements_count.get
end
def self.statements_exec_count
@@statements_exec_count.get
end
protected def perform_query(args : Enumerable) : DB::ResultSet
assert_not_closed!
@@statements_exec_count.add(1)
Fiber.yield
@connection.as(DummyConnection).check
set_params args
DummyResultSet.new self, @query
DummyResultSet.new self, command
end
protected def perform_exec(args : Enumerable) : DB::ExecResult
assert_not_closed!
@@statements_exec_count.add(1)
@connection.as(DummyConnection).check
set_params args
raise DB::Error.new("forced exception due to query") if @query == "raise"
raise DB::Error.new("forced exception due to query") if command == "raise"
DB::ExecResult.new 0i64, 0_i64
end
@ -141,6 +201,10 @@ class DummyDriver < DB::Driver
protected def do_close
super
end
private def assert_not_closed!
raise "Statement is closed" if closed?
end
end
class DummyResultSet < DB::ResultSet
@ -149,9 +213,11 @@ class DummyDriver < DB::Driver
@@last_result_set : self?
def initialize(statement, query)
def initialize(statement, command)
super(statement)
@top_values = query.split.map { |r| r.split(',') }.to_a
Fiber.yield
@top_values = command.split.map { |r| r.split(',') }.to_a
@column_count = @top_values.size > 0 ? @top_values[0].size : 2
@@last_result_set = self
@ -187,7 +253,11 @@ class DummyDriver < DB::Driver
return (@statement.as(DummyStatement)).params[0]
end
return n
n.to_i64? || n
end
def next_column_index : Int32
@column_count - @values.not_nil!.size
end
def read(t : String.class)

View file

@ -126,7 +126,7 @@ describe DummyDriver do
it "raises if no rows" do
with_dummy do |db|
expect_raises(DB::Error, "no rows") do
expect_raises(DB::NoResultsError) do
db.query_one("") { }
end
end
@ -246,6 +246,8 @@ describe DummyDriver do
rs.read(Int64, Int64).should eq({3i64, 4i64})
when 1
rs.read(Int64, Int64).should eq({1i64, 2i64})
else
raise "unreachable"
end
i += 1
end

View file

@ -0,0 +1,52 @@
require "./spec_helper"
require "./support/http"
describe DB::Pool do
it "distributes evenly the requests" do
mutex = Mutex.new
requests_per_connection = Hash(Socket::Address, Int32).new
server = HTTP::Server.new do |context|
remote_address = context.request.remote_address.not_nil!
mutex.synchronize do
requests_per_connection[remote_address] ||= 0
requests_per_connection[remote_address] += 1
end
sleep context.request.query_params["delay"].to_f
context.response.print "ok"
end
address = server.bind_unused_port "127.0.0.1"
run_server(server) do
fixed_pool_size = 5
expected_per_connection = 5
requests = fixed_pool_size * expected_per_connection
pool = DB::Pool.new(DB::Pool::Options.new(
initial_pool_size: fixed_pool_size,
max_pool_size: fixed_pool_size,
max_idle_pool_size: fixed_pool_size)) {
HTTP::Client.new(URI.parse("http://127.0.0.1:#{address.port}/"))
}
done = Channel(Nil).new
requests.times do
spawn do
pool.checkout do |http|
http.get("/?delay=0.1")
end
done.send(nil)
end
end
spawn do
requests.times { done.receive }
done.close
end
wait_for { done.closed? }
requests_per_connection.values.should eq([expected_per_connection] * fixed_pool_size)
end
end
end

54
spec/manual/load_test.cr Normal file
View file

@ -0,0 +1,54 @@
# This file is to be executed as:
#
# % crystal ./spec/manual/load_test.cr
#
# It generates a number of producers and consumers. If the process hangs
# it means that the connection pool is not working properly. Likely a race condition.
#
require "../dummy_driver"
require "../../src/db"
require "json"
CONNECTION = "dummy://host?initial_pool_size=5&max_pool_size=5&max_idle_pool_size=5"
alias TChannel = Channel(Int32)
alias TDone = Channel(Bool)
COUNT = 200
def start_consumer(channel : TChannel, done : TDone)
spawn do
indeces = Set(Int32).new
loop do
indeces << channel.receive
puts "Received size=#{indeces.size}"
break if indeces.size == COUNT
end
done.send true
end
end
def start_producers(channel : TChannel)
db = DB.open CONNECTION do |db|
sql = "1,title,description,2023 " * 100_000
COUNT.times do |index|
spawn(name: "prod #{index}") do
puts "Sending #{index}"
_films = db.query_all(sql, as: {Int32, String, String, Int32})
rescue ex
puts "Error: #{ex.message}"
ensure
channel.send index
end
end
end
end
channel = TChannel.new
done = TDone.new
start_consumer(channel, done)
start_producers(channel)
done.receive

View file

@ -0,0 +1,68 @@
# This file is to be executed as:
#
# % crystal run --release [-Dpreview_mt] ./spec/manual/pool_concurrency_test.cr -- --options="max_pool_size=5" --duration=30 --concurrency=4
#
#
require "option_parser"
require "../dummy_driver"
require "../../src/db"
options = ""
duration = 3
concurrency = 4
OptionParser.parse do |parser|
parser.banner = "Usage: pool_concurrency_test [arguments]"
parser.on("-o", "--options=VALUE", "Connection string options") { |v| options = v }
parser.on("-d", "--duration=SECONDS", "Specifies the duration in seconds") { |v| duration = v.to_i }
parser.on("-c", "--concurrency=VALUE", "Specifies the concurrent requests to perform") { |v| concurrency = v.to_i }
parser.on("-h", "--help", "Show this help") do
puts parser
exit
end
parser.invalid_option do |flag|
STDERR.puts "ERROR: #{flag} is not a valid option."
STDERR.puts parser
exit(1)
end
end
multi_threaded = {% if flag?(:preview_mt) %} ENV["CRYSTAL_WORKERS"]?.try(&.to_i?) || 4 {% else %} false {% end %}
release = {% if flag?(:release) %} true {% else %} false {% end %}
if !release
puts "WARNING: This should be run in release mode."
end
db = DB.open "dummy://host?#{options}"
start_time = Time.monotonic
puts "Starting test for #{duration} seconds..."
concurrency.times do
spawn do
loop do
db.scalar "1"
Fiber.yield
end
end
end
sleep duration.seconds
end_time = Time.monotonic
puts " Options : #{options}"
puts " Duration (sec) : #{duration} (actual #{end_time - start_time})"
puts " Concurrency : #{concurrency}"
puts " Multi Threaded : #{multi_threaded ? "Yes (#{multi_threaded})" : "No"}"
puts "Total Connections : #{DummyDriver::DummyConnection.connections_count}"
puts " Total Statements : #{DummyDriver::DummyStatement.statements_count}"
puts " Total Queries : #{DummyDriver::DummyStatement.statements_exec_count}"
puts " Throughput (q/s) : #{DummyDriver::DummyStatement.statements_exec_count / duration}"
if !release
puts "WARNING: This should be run in release mode."
end

View file

@ -57,15 +57,19 @@ class Closable
end
end
private def create_pool(**options, &factory : -> T) forall T
DB::Pool.new(DB::Pool::Options.new(**options), &factory)
end
describe DB::Pool do
it "should use proc to create objects" do
block_called = 0
pool = DB::Pool.new(initial_pool_size: 3) { block_called += 1; Closable.new }
pool = create_pool(initial_pool_size: 3) { block_called += 1; Closable.new }
block_called.should eq(3)
end
it "should get resource" do
pool = DB::Pool.new { Closable.new }
pool = create_pool { Closable.new }
resource = pool.checkout
resource.should be_a Closable
resource.before_checkout_called.should be_true
@ -73,18 +77,18 @@ describe DB::Pool do
it "should be available if not checkedout" do
resource = uninitialized Closable
pool = DB::Pool.new(initial_pool_size: 1) { resource = Closable.new }
pool = create_pool(initial_pool_size: 1) { resource = Closable.new }
pool.is_available?(resource).should be_true
end
it "should not be available if checkedout" do
pool = DB::Pool.new { Closable.new }
pool = create_pool { Closable.new }
resource = pool.checkout
pool.is_available?(resource).should be_false
end
it "should be available if returned" do
pool = DB::Pool.new { Closable.new }
pool = create_pool { Closable.new }
resource = pool.checkout
resource.after_release_called.should be_false
pool.release resource
@ -93,7 +97,7 @@ describe DB::Pool do
end
it "should wait for available resource" do
pool = DB::Pool.new(max_pool_size: 1, initial_pool_size: 1) { Closable.new }
pool = create_pool(max_pool_size: 1, initial_pool_size: 1) { Closable.new }
b_cnn_request = ShouldSleepingOp.new
wait_a = WaitFor.new
@ -121,7 +125,7 @@ describe DB::Pool do
it "should create new if max was not reached" do
block_called = 0
pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { block_called += 1; Closable.new }
pool = create_pool(max_pool_size: 2, initial_pool_size: 1) { block_called += 1; Closable.new }
block_called.should eq 1
pool.checkout
block_called.should eq 1
@ -131,7 +135,7 @@ describe DB::Pool do
it "should reuse returned resources" do
all = [] of Closable
pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool = create_pool(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool.checkout
b1 = pool.checkout
pool.release b1
@ -143,7 +147,7 @@ describe DB::Pool do
it "should close available and total" do
all = [] of Closable
pool = DB::Pool.new(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool = create_pool(max_pool_size: 2, initial_pool_size: 1) { Closable.new.tap { |c| all << c } }
a = pool.checkout
b = pool.checkout
pool.release b
@ -157,7 +161,7 @@ describe DB::Pool do
end
it "should timeout" do
pool = DB::Pool.new(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
pool = create_pool(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
pool.checkout
expect_raises DB::PoolTimeout do
pool.checkout
@ -165,7 +169,7 @@ describe DB::Pool do
end
it "should be able to release after a timeout" do
pool = DB::Pool.new(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
pool = create_pool(max_pool_size: 1, checkout_timeout: 0.1) { Closable.new }
a = pool.checkout
pool.checkout rescue nil
pool.release a
@ -173,7 +177,7 @@ describe DB::Pool do
it "should close if max idle amount is reached" do
all = [] of Closable
pool = DB::Pool.new(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool = create_pool(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool.checkout
pool.checkout
pool.checkout
@ -190,9 +194,26 @@ describe DB::Pool do
all[2].closed?.should be_false
end
it "should not return closed resources to the pool" do
pool = create_pool(max_pool_size: 1, max_idle_pool_size: 1) { Closable.new }
# pool size 1 should be reusing the one resource
resource1 = pool.checkout
pool.release resource1
resource2 = pool.checkout
resource1.should eq resource2
# it should not return a closed resource to the pool
resource2.close
pool.release resource2
resource2 = pool.checkout
resource1.should_not eq resource2
end
it "should create resource after max_pool was reached if idle forced some close up" do
all = [] of Closable
pool = DB::Pool.new(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool = create_pool(max_pool_size: 3, max_idle_pool_size: 1) { Closable.new.tap { |c| all << c } }
pool.checkout
pool.checkout
pool.checkout

View file

@ -81,6 +81,29 @@ class ModelWithJSON
property c1 : String
end
struct ModelWithEnum
include DB::Serializable
getter c0 : Int32
getter c1 : MyEnum
# Ensure multiple enum types work together
getter c2 : MyOtherEnum
enum MyEnum
Foo = 0
Bar = 1
Baz = 2
Quux = 3
end
enum MyOtherEnum
OMG
LOL
WTF
BBQ
end
end
macro from_dummy(query, type)
with_dummy do |db|
rs = db.query({{ query }})
@ -105,19 +128,19 @@ describe "DB::Serializable" do
end
it "should fail to initialize a simple model if types do not match" do
expect_raises ArgumentError do
expect_raises DB::MappingException, /Invalid Int32: "?b"?\n deserializing SimpleModel#c0/ do
from_dummy("b,a", SimpleModel)
end
end
it "should fail to initialize a simple model if there is a missing column" do
expect_raises DB::MappingException do
expect_raises DB::MappingException, "Missing column c1\n deserializing SimpleModel#c1" do
from_dummy("1", SimpleModel)
end
end
it "should fail to initialize a simple model if there is an unexpected column" do
expect_raises DB::MappingException do
expect_raises DB::MappingException, "Unknown column: c2\n deserializing SimpleModel" do
from_dummy("1,a,b", SimpleModel)
end
end
@ -172,6 +195,17 @@ describe "DB::Serializable" do
expect_model("1,a", ModelWithJSON, {c0: 1, c1: "a"})
end
it "should initialize a model with an enum property" do
expect_model("1,2,LOL", ModelWithEnum, {
c0: 1,
c1: ModelWithEnum::MyEnum::Baz,
c2: ModelWithEnum::MyOtherEnum::LOL,
})
expect_raises DB::MappingException, "Unknown enum ModelWithEnum::MyEnum value: adsf" do
from_dummy("1,adsf,BBQ", ModelWithEnum)
end
end
it "should initialize multiple instances from a single resultset" do
with_dummy do |db|
db.query("1,a 2,b") do |rs|

View file

@ -1,4 +1,5 @@
require "./spec_helper"
require "log/spec"
describe DB::Statement do
it "should build prepared statements" do
@ -33,6 +34,60 @@ describe DB::Statement do
end
end
describe "prepared_statements_cache flag" do
it "should reuse prepared statements if true" do
with_dummy_connection("prepared_statements=true&prepared_statements_cache=true") do |cnn|
stmt1 = cnn.query("the query").statement
stmt2 = cnn.query("the query").statement
stmt1.object_id.should eq(stmt2.object_id)
end
end
it "should leave statements open to be reused if true" do
with_dummy_connection("prepared_statements=true&prepared_statements_cache=true") do |cnn|
rs = cnn.query("the query")
# do not close while iterating
rs.statement.closed?.should be_false
rs.close
# do not close to be reused
rs.statement.closed?.should be_false
end
end
it "should not reuse prepared statements if false" do
with_dummy_connection("prepared_statements=true&prepared_statements_cache=false") do |cnn|
stmt1 = cnn.query("the query").statement
stmt2 = cnn.query("the query").statement
stmt1.object_id.should_not eq(stmt2.object_id)
end
end
it "should close statements if false" do
with_dummy_connection("prepared_statements=true&prepared_statements_cache=false") do |cnn|
rs = cnn.query("the query")
# do not close while iterating
rs.statement.closed?.should be_false
rs.close
# do close after iterating
rs.statement.closed?.should be_true
end
end
it "should not close statements if false and created explicitly" do
with_dummy_connection("prepared_statements=true&prepared_statements_cache=false") do |cnn|
stmt = cnn.prepared("the query")
rs = stmt.query
# do not close while iterating
stmt.closed?.should be_false
rs.close
# do not close after iterating
stmt.closed?.should be_false
end
end
end
it "should initialize positional params in query" do
with_dummy_connection do |cnn|
stmt = cnn.prepared("the query").as(DummyDriver::DummyStatement)
@ -208,4 +263,49 @@ describe DB::Statement do
db.pool.is_available?(DummyDriver::DummyConnection.connections.first)
end
end
it "raises NoResultsError for scalar" do
with_dummy_connection do |cnn|
stmt = cnn.prepared ""
expect_raises DB::NoResultsError do
stmt.scalar "SELECT LIMIT 0"
end
end
end
describe "logging" do
it "exec with no arguments" do
Log.capture(DB::Log.source) do |logs|
with_dummy do |db|
db.exec "42"
end
entry = logs.check(:debug, /Executing query/i).entry
entry.data[:query].should eq("42")
entry.data[:args].as_a.should be_empty
end
end
it "query with arguments" do
Log.capture(DB::Log.source) do |logs|
with_dummy do |db|
db.exec "1, ?", args: ["a"]
db.exec "2, ?", "a"
db.exec "3, ?", ["a"]
end
entry = logs.check(:debug, /Executing query/i).entry
entry.data[:query].should eq("1, ?")
entry.data[:args][0].as_s.should eq("a")
entry = logs.check(:debug, /Executing query/i).entry
entry.data[:query].should eq("2, ?")
entry.data[:args][0].as_s.should eq("a")
entry = logs.check(:debug, /Executing query/i).entry
entry.data[:query].should eq("3, ?")
entry.data[:args][0][0].as_s.should eq("a")
end
end
end
end

16
spec/support/fibers.cr Normal file
View file

@ -0,0 +1,16 @@
def wait_until_blocked(f : Fiber, timeout = 5.seconds)
now = Time.monotonic
until f.resumable?
Fiber.yield
raise "fiber failed to block within #{timeout}" if (Time.monotonic - now) > timeout
end
end
def wait_until_finished(f : Fiber, timeout = 5.seconds)
now = Time.monotonic
until f.dead?
Fiber.yield
raise "fiber failed to finish within #{timeout}" if (Time.monotonic - now) > timeout
end
end

48
spec/support/http.cr Normal file
View file

@ -0,0 +1,48 @@
require "http"
require "./fibers"
def wait_for(timeout = 5.seconds)
now = Time.monotonic
until yield
Fiber.yield
if (Time.monotonic - now) > timeout
raise "block failed to evaluate to true within #{timeout}"
end
end
end
# Helper method which runs *server*
# 1. Spawns `server.listen` in a new fiber.
# 2. Waits until `server.listening?`.
# 3. Yields to the given block.
# 4. Ensures the server is closed.
# 5. After returning from the block, it waits for the server to gracefully
# shut down before continuing execution in the current fiber.
# 6. If the listening fiber raises an exception, it is rescued and re-raised
# in the current fiber.
def run_server(server)
server_done = Channel(Exception?).new
f = spawn do
server.listen
rescue exc
server_done.send exc
else
server_done.send nil
end
begin
wait_for { server.listening? }
wait_until_blocked f
yield server_done
ensure
server.close unless server.closed?
if exc = server_done.receive
raise exc
end
end
end

View file

@ -95,6 +95,20 @@ describe DB::Transaction do
t.committed.should be_false
end
it "transaction with block from connection should be committed if `return` is called" do
t = uninitialized DummyDriver::DummyTransaction
with_witness do |w|
with_dummy_connection do |cnn|
t = return_from_txn(cnn).as(DummyDriver::DummyTransaction)
w.check
end
end
t.rolledback.should be_false
t.committed.should be_true
end
it "transaction can be committed within block" do
with_dummy_connection do |cnn|
cnn.transaction do |tx|
@ -175,4 +189,45 @@ describe DB::Transaction do
db.pool.is_available?(cnn).should be_true
end
end
it "returns block value when sucess" do
with_dummy_connection do |cnn|
res = cnn.transaction do |tx|
42
end
res.should eq(42)
typeof(res).should eq(Int32 | Nil)
end
end
it "returns value on rollback via method" do
with_dummy_connection do |cnn|
res = cnn.transaction do |tx|
tx.rollback
42
end
res.should eq(42)
typeof(res).should eq(Int32 | Nil)
end
end
it "returns nil on rollback via exception" do
with_dummy_connection do |cnn|
res = cnn.transaction do |tx|
raise DB::Rollback.new
42
end
res.should be_nil
typeof(res).should eq(Int32 | Nil)
end
end
end
private def return_from_txn(cnn)
cnn.transaction do |tx|
return tx
end
end

View file

@ -1,4 +1,5 @@
require "uri"
require "log"
# The DB module is a unified interface for database access.
# Individual database systems are supported by specific database driver shards.
@ -56,7 +57,7 @@ require "uri"
# args = [] of DB::Any
# args << "Sarah"
# args << 33
# db.exec "insert into contacts values (?, ?)", args
# db.exec "insert into contacts values (?, ?)", args: args
#
# puts "max age:"
# puts db.scalar "select max(age) from contacts" # => 33
@ -74,7 +75,13 @@ require "uri"
# end
# ```
#
# ### Object mapping
#
# The `DB::Serializable` module implements a declarative mapping from DB result
# sets to Crystal types.
module DB
Log = ::Log.for(self)
# Types supported to interface with database driver.
# These can be used in any `ResultSet#read` or any `Database#query` related
# method to be used as query parameters
@ -134,7 +141,7 @@ module DB
build_connection(uri)
end
# ditto
# :ditto:
def self.connect(uri : URI | String, &block)
cnn = build_connection(uri)
begin
@ -149,7 +156,13 @@ module DB
end
private def self.build_database(uri : URI)
Database.new(build_driver(uri), uri)
driver = build_driver(uri)
params = HTTP::Params.parse(uri.query || "")
connection_options = driver.connection_options(params)
pool_options = driver.pool_options(params)
builder = driver.connection_builder(uri)
factory = ->{ builder.build }
Database.new(connection_options, pool_options, &factory)
end
private def self.build_connection(connection_string : String)
@ -157,7 +170,7 @@ module DB
end
private def self.build_connection(uri : URI)
build_driver(uri).build_connection(SingleConnectionContext.new(uri)).as(Connection)
build_driver(uri).connection_builder(uri).build
end
private def self.build_driver(uri : URI)
@ -185,6 +198,7 @@ require "./db/enumerable_concat"
require "./db/query_methods"
require "./db/session_methods"
require "./db/disposable"
require "./db/connection_builder"
require "./db/driver"
require "./db/statement"
require "./db/begin_transaction"

View file

@ -11,22 +11,28 @@ module DB
# The exception thrown is bubbled unless it is a `DB::Rollback`.
# From the yielded object `Transaction#commit` or `Transaction#rollback`
# can be called explicitly.
def transaction
tx = begin_transaction
# Returns the value of the block.
def transaction(& : Transaction -> T) : T? forall T
rollback = false
# TODO: Cast to workaround crystal-lang/crystal#9483
# begin_transaction returns a Tx where Tx < Transaction
tx = begin_transaction.as(Transaction)
begin
yield tx
res = yield tx
rescue DB::Rollback
tx.rollback unless tx.closed?
rollback = true
res
rescue e
unless tx.closed?
# Ignore error in rollback.
# It would only be a secondary error to the original one, caused by
# corrupted connection state.
tx.rollback rescue nil
end
rollback = true
raise e
else
tx.commit unless tx.closed?
ensure
unless tx.closed?
if rollback
tx.rollback
else
tx.commit
end
end
end
end
end

View file

@ -23,21 +23,44 @@ module DB
include SessionMethods(Connection, Statement)
include BeginTransaction
record Options,
# Return whether the statements should be prepared by default
prepared_statements : Bool = true,
# Return whether the prepared statements should be cached or not
prepared_statements_cache : Bool = true do
def self.from_http_params(params : HTTP::Params, default = Options.new)
Options.new(
prepared_statements: DB.fetch_bool(params, "prepared_statements", default.prepared_statements),
prepared_statements_cache: DB.fetch_bool(params, "prepared_statements_cache", default.prepared_statements)
)
end
end
# :nodoc:
getter context
property context : ConnectionContext = SingleConnectionContext.default
@statements_cache = StringKeyCache(Statement).new
@transaction = false
getter? prepared_statements : Bool
# :nodoc:
property auto_release : Bool = true
def initialize(@context : ConnectionContext)
@prepared_statements = @context.prepared_statements?
def initialize(@options : Options)
end
def prepared_statements? : Bool
@options.prepared_statements
end
def prepared_statements_cache? : Bool
@options.prepared_statements_cache
end
# :nodoc:
def fetch_or_build_prepared_statement(query) : Statement
@statements_cache.fetch(query) { build_prepared_statement(query) }
if @options.prepared_statements_cache
@statements_cache.fetch(query) { build_prepared_statement(query) }
else
build_prepared_statement(query)
end
end
# :nodoc:
@ -59,7 +82,7 @@ module DB
protected def do_close
@statements_cache.each_value &.close
@statements_cache.clear
@context.discard self
context.discard self
end
# :nodoc:
@ -75,7 +98,7 @@ module DB
# managed by the database. Should be used
# only if the connection was obtained by `Database#checkout`.
def release
@context.release(self)
context.release(self)
end
# :nodoc:

View file

@ -0,0 +1,8 @@
module DB
# A connection factory with a specific configuration.
#
# See `Driver#connection_builder`.
abstract class ConnectionBuilder
abstract def build : Connection
end
end

View file

@ -1,11 +1,5 @@
module DB
module ConnectionContext
# Returns the uri with the connection settings to the database
abstract def uri : URI
# Return whether the statements should be prepared by default
abstract def prepared_statements? : Bool
# Indicates that the *connection* was permanently closed
# and should not be used in the future.
abstract def discard(connection : Connection)
@ -19,13 +13,7 @@ module DB
class SingleConnectionContext
include ConnectionContext
getter uri : URI
getter? prepared_statements : Bool
def initialize(@uri : URI)
params = HTTP::Params.parse(uri.query || "")
@prepared_statements = DB.fetch_bool(params, "prepared_statements", true)
end
class_getter default : SingleConnectionContext = SingleConnectionContext.new
def discard(connection : Connection)
end

View file

@ -10,8 +10,9 @@ module DB
#
# ## Database URI
#
# Connection parameters are configured in a URI. The format is specified by the individual
# database drivers. See the [reference book](https://crystal-lang.org/reference/database/) for examples.
# Connection parameters are usually in a URI. The format is specified by the individual
# database drivers, yet there are some common properties names usually shared.
# See the [reference book](https://crystal-lang.org/reference/database/) for examples.
#
# The connection pool can be configured from URI parameters:
#
@ -31,35 +32,45 @@ module DB
include SessionMethods(Database, PoolStatement)
include ConnectionContext
# :nodoc:
getter driver
# :nodoc:
getter pool
# Returns the uri with the connection settings to the database
getter uri : URI
getter? prepared_statements : Bool
@connection_options : Connection::Options
@pool : Pool(Connection)
@setup_connection : Connection -> Nil
@statements_cache = StringKeyCache(PoolPreparedStatement).new
# :nodoc:
def initialize(@driver : Driver, @uri : URI)
params = HTTP::Params.parse(uri.query || "")
@prepared_statements = DB.fetch_bool(params, "prepared_statements", true)
pool_options = @driver.connection_pool_options(params)
# Initialize a database with the specified options and connection factory.
# This covers more advanced use cases that might not be supported by an URI connection string such as tunneling connection.
def initialize(connection_options : Connection::Options, pool_options : Pool::Options, &factory : -> Connection)
@connection_options = connection_options
@setup_connection = ->(conn : Connection) {}
@pool = uninitialized Pool(Connection) # in order to use self in the factory proc
@pool = Pool.new(**pool_options) {
conn = @driver.build_connection(self).as(Connection)
@pool = Pool(Connection).new(pool_options) {
conn = factory.call
conn.auto_release = false
conn.context = self
@setup_connection.call conn
conn
}
end
def prepared_statements? : Bool
@connection_options.prepared_statements
end
def prepared_statements_cache? : Bool
@connection_options.prepared_statements_cache
end
# Run the specified block every time a new connection is established, yielding the new connection
# to the block.
#
# ```
# db = DB.open(DB_URL)
# db.setup_connection do |connection|
# connection.exec "SET TIME ZONE 'America/New_York'"
# end
# ```
def setup_connection(&proc : Connection -> Nil)
@setup_connection = proc
@pool.each_resource do |conn|
@ -69,9 +80,6 @@ module DB
# Closes all connection to the database.
def close
@statements_cache.each_value &.close
@statements_cache.clear
@pool.close
end
@ -87,11 +95,6 @@ module DB
# :nodoc:
def fetch_or_build_prepared_statement(query) : PoolStatement
@statements_cache.fetch(query) { build_prepared_statement(query) }
end
# :nodoc:
def build_prepared_statement(query) : PoolStatement
PoolPreparedStatement.new(self, query)
end

View file

@ -1,21 +1,34 @@
module DB
# Database driver implementors must subclass `Driver`,
# register with a driver_name using `DB#register_driver` and
# override the factory method `#build_connection`.
# override the factory method `#connection_builder`.
#
# ```
# require "db"
#
# class FakeDriver < DB::Driver
# def build_connection(context : DB::ConnectionContext)
# FakeConnection.new context
# class FakeConnectionBuilder < DB::ConnectionBuilder
# def initialize(@options : DB::Connection::Options)
# end
#
# def build : DB::Connection
# FakeConnection.new(@options)
# end
# end
#
# def connection_builder(uri : URI) : ConnectionBuilder
# params = HTTP::Params.parse(uri.query || "")
# options = connection_options(params)
# # If needed, parse custom options from uri here
# # so they are parsed only once.
# FakeConnectionBuilder.new(options)
# end
# end
#
# DB.register_driver "fake", FakeDriver
# ```
#
# Access to this fake datbase will be available with
# Access to this fake database will be available with
#
# ```
# DB.open "fake://..." do |db|
@ -25,18 +38,22 @@ module DB
#
# Refer to `Connection`, `Statement` and `ResultSet` for further
# driver implementation instructions.
#
# Override `#connection_options` and `#pool_options` to provide custom
# defaults or parsing of the connection string URI.
abstract class Driver
abstract def build_connection(context : ConnectionContext) : Connection
# Returns a new connection factory.
#
# NOTE: For implementors *uri* should be parsed once. If all the options
# are sound a ConnectionBuilder is returned.
abstract def connection_builder(uri : URI) : ConnectionBuilder
def connection_pool_options(params : HTTP::Params)
{
initial_pool_size: params.fetch("initial_pool_size", 1).to_i,
max_pool_size: params.fetch("max_pool_size", 0).to_i,
max_idle_pool_size: params.fetch("max_idle_pool_size", 1).to_i,
checkout_timeout: params.fetch("checkout_timeout", 5.0).to_f,
retry_attempts: params.fetch("retry_attempts", 1).to_i,
retry_delay: params.fetch("retry_delay", 1.0).to_f,
}
def connection_options(params : HTTP::Params) : Connection::Options
Connection::Options.from_http_params(params)
end
def pool_options(params : HTTP::Params) : Pool::Options
Pool::Options.from_http_params(params)
end
end
end

View file

@ -20,7 +20,7 @@ module DB
end
# returns given `e1 : T` an `Enumerable(T')` and `e2 : U` an `Enumerable(U') | Nil`
# it retuns and `Enumerable(T' | U')` that enumerates the elements of `e1`
# it returns an `Enumerable(T' | U')` that enumerates the elements of `e1`
# and, later, the elements of `e2`.
def self.build(e1 : T, e2 : U)
return e1 if e2.nil? || e2.empty?

View file

@ -1,8 +1,24 @@
module DB
abstract class Connection
end
class Error < Exception
end
class MappingException < Error
getter klass
getter property
def initialize(message, @klass : String, @property : String? = nil, cause : Exception? = nil)
message = String.build do |io|
io << message
io << "\n deserializing " << @klass
if property = @property
io << "#" << property
end
end
super(message, cause: cause)
end
end
class PoolTimeout < Error
@ -11,22 +27,50 @@ module DB
class PoolRetryAttemptsExceeded < Error
end
class PoolResourceLost(T) < Error
getter resource : T
def initialize(@resource : T, cause : Exception? = nil)
super(cause: cause)
@resource.close
end
end
class PoolResourceRefused < Error
end
# Raised when an established connection is lost
# probably due to socket/network issues.
# It is used by the connection pool retry logic.
class ConnectionLost < Error
getter connection : Connection
def initialize(@connection)
class ConnectionLost < PoolResourceLost(Connection)
def connection
resource
end
end
# Raised when a connection is unable to be established
# probably due to socket/network or configuration issues.
# It is used by the connection pool retry logic.
class ConnectionRefused < Error
class ConnectionRefused < PoolResourceRefused
end
class Rollback < Error
end
# Raised when a scalar query returns no results.
class NoResultsError < Error
end
# Raised when the type returned for the column value
# does not match the type expected.
class ColumnTypeMismatchError < Error
getter column_index : Int32
getter column_name : String
getter column_type : String
getter expected_type : String
def initialize(*, context : String, @column_index : Int32, @column_name : String, @column_type : String, @expected_type : String)
super("In #{context} the column #{column_name} returned a #{column_type} but a #{expected_type} was expected.")
end
end
end

View file

@ -1,5 +1,6 @@
module DB
# Empty module used for marking a class as supporting DB:Mapping
@[Deprecated("Use `DB::Serializable` instead")]
module Mappable; end
# The `DB.mapping` macro defines how an object is built from a `ResultSet`.
@ -8,7 +9,7 @@ module DB
# Once defined, `ResultSet#read(t)` populates properties of the class from the
# `ResultSet`.
#
# ```crystal
# ```
# require "db"
#
# class Employee
@ -28,7 +29,7 @@ module DB
#
# You can also define attributes for each property.
#
# ```crystal
# ```
# class Employee
# DB.mapping({
# title: String,
@ -57,6 +58,7 @@ module DB
# it and initializes this type's instance variables.
#
# This macro also declares instance variables of the types given in the mapping.
@[Deprecated("Use `DB::Serializable` instead")]
macro mapping(properties, strict = true)
include ::DB::Mappable
@ -117,7 +119,7 @@ module DB
{% end %}
else
{% if strict %}
raise ::DB::MappingException.new("unknown result set attribute: #{col_name}")
raise ::DB::MappingException.new("unknown result set attribute: #{col_name}", self.class.to_s)
{% else %}
%rs.read
{% end %}
@ -127,7 +129,7 @@ module DB
{% for key, value in properties %}
{% unless value[:nilable] || value[:default] != nil %}
if %var{key.id}.is_a?(Nil) && !%found{key.id}
raise ::DB::MappingException.new("missing result set attribute: {{(value[:key] || key).id}}")
raise ::DB::MappingException.new("missing result set attribute: {{(value[:key] || key).id}}", self.class.to_s)
end
{% end %}
{% end %}
@ -148,6 +150,7 @@ module DB
end
end
@[Deprecated("Use `DB::Serializable` instead")]
macro mapping(**properties)
::DB.mapping({{properties}})
end

View file

@ -1,7 +1,34 @@
require "weak_ref"
require "./error"
module DB
class Pool(T)
record Options,
# initial number of connections in the pool
initial_pool_size : Int32 = 1,
# maximum amount of connections in the pool (Idle + InUse). 0 means no maximum.
max_pool_size : Int32 = 0,
# maximum amount of idle connections in the pool
max_idle_pool_size : Int32 = 1,
# seconds to wait before timeout while doing a checkout
checkout_timeout : Float64 = 5.0,
# maximum amount of retry attempts to reconnect to the db. See `Pool#retry`
retry_attempts : Int32 = 1,
# seconds to wait before a retry attempt
retry_delay : Float64 = 0.2 do
def self.from_http_params(params : HTTP::Params, default = Options.new)
Options.new(
initial_pool_size: params.fetch("initial_pool_size", default.initial_pool_size).to_i,
max_pool_size: params.fetch("max_pool_size", default.max_pool_size).to_i,
max_idle_pool_size: params.fetch("max_idle_pool_size", default.max_idle_pool_size).to_i,
checkout_timeout: params.fetch("checkout_timeout", default.checkout_timeout).to_f,
retry_attempts: params.fetch("retry_attempts", default.retry_attempts).to_i,
retry_delay: params.fetch("retry_delay", default.retry_delay).to_f,
)
end
end
# Pool configuration
# initial number of connections in the pool
@ -30,15 +57,29 @@ module DB
# communicate that a connection is available for checkout
@availability_channel : Channel(Nil)
# signal how many existing connections are waited for
@waiting_resource : Int32
# global pool mutex
@mutex : Mutex
def initialize(@initial_pool_size = 1, @max_pool_size = 0, @max_idle_pool_size = 1, @checkout_timeout = 5.0,
@retry_attempts = 1, @retry_delay = 0.2, &@factory : -> T)
@[Deprecated("Use `#new` with DB::Pool::Options instead")]
def initialize(initial_pool_size = 1, max_pool_size = 0, max_idle_pool_size = 1, checkout_timeout = 5.0,
retry_attempts = 1, retry_delay = 0.2, &factory : -> T)
initialize(
Options.new(
initial_pool_size: initial_pool_size, max_pool_size: max_pool_size,
max_idle_pool_size: max_idle_pool_size, checkout_timeout: checkout_timeout,
retry_attempts: retry_attempts, retry_delay: retry_delay),
&factory)
end
def initialize(pool_options : Options = Options.new, &@factory : -> T)
@initial_pool_size = pool_options.initial_pool_size
@max_pool_size = pool_options.max_pool_size
@max_idle_pool_size = pool_options.max_idle_pool_size
@checkout_timeout = pool_options.checkout_timeout
@retry_attempts = pool_options.retry_attempts
@retry_delay = pool_options.retry_delay
@availability_channel = Channel(Nil).new
@waiting_resource = 0
@inflight = 0
@mutex = Mutex.new
@ -52,12 +93,19 @@ module DB
@idle.clear
end
record Stats, open_connections : Int32
record Stats,
open_connections : Int32,
idle_connections : Int32,
in_flight_connections : Int32,
max_connections : Int32
# Returns stats of the pool
def stats
Stats.new(
open_connections: @total.size
open_connections: @total.size,
idle_connections: @idle.size,
in_flight_connections: @inflight,
max_connections: @max_pool_size,
)
end
@ -69,8 +117,11 @@ module DB
resource = if @idle.empty?
if can_increase_pool?
@inflight += 1
r = unsync { build_resource }
@inflight -= 1
begin
r = unsync { build_resource }
ensure
@inflight -= 1
end
r
else
unsync { wait_for_available }
@ -91,38 +142,33 @@ module DB
resource
end
res.before_checkout
if res.responds_to?(:before_checkout)
res.before_checkout
end
res
end
# ```
# selected, is_candidate = pool.checkout_some(candidates)
# ```
# `selected` be a resource from the `candidates` list and `is_candidate` == `true`
# or `selected` will be a new resource and `is_candidate` == `false`
def checkout_some(candidates : Enumerable(WeakRef(T))) : {T, Bool}
sync do
candidates.each do |ref|
resource = ref.value
if resource && is_available?(resource)
@idle.delete resource
resource.before_checkout
return {resource, true}
end
end
end
def checkout(&block : T ->)
connection = checkout
resource = checkout
{resource, candidates.any? { |ref| ref.value == resource }}
begin
yield connection
ensure
release connection
end
end
def release(resource : T) : Nil
idle_pushed = false
sync do
if can_increase_idle_pool
if resource.responds_to?(:closed?) && resource.closed?
@total.delete(resource)
elsif can_increase_idle_pool
@idle << resource
resource.after_release
if resource.responds_to?(:after_release)
resource.after_release
end
idle_pushed = true
else
resource.close
@ -130,8 +176,11 @@ module DB
end
end
if idle_pushed && are_waiting_for_resource?
@availability_channel.send nil
if idle_pushed
select
when @availability_channel.send(nil)
else
end
end
end
@ -153,12 +202,12 @@ module DB
begin
sleep @retry_delay if i >= current_available
return yield
rescue e : ConnectionLost
# if the connection is lost close it to release resources
# and remove it from the known pool.
sync { delete(e.connection) }
e.connection.close
rescue e : ConnectionRefused
rescue e : PoolResourceLost(T)
# if the connection is lost it will be closed by
# the exception to release resources
# we still need to remove it from the known pool.
sync { delete(e.resource) }
rescue e : PoolResourceRefused
# a ConnectionRefused means a new connection
# was intended to be created
# nothing to due but to retry soon
@ -189,8 +238,10 @@ module DB
private def build_resource : T
resource = @factory.call
@total << resource
@idle << resource
sync do
@total << resource
@idle << resource
end
resource
end
@ -207,37 +258,13 @@ module DB
end
private def wait_for_available
timeout = TimeoutHelper.new(@checkout_timeout.to_f64)
sync_inc_waiting_resource
timeout.start
# TODO update to select keyword for crystal 0.19
index, _ = Channel.select(@availability_channel.receive_select_action, timeout.receive_select_action)
case index
when 0
timeout.cancel
sync_dec_waiting_resource
when 1
sync_dec_waiting_resource
raise DB::PoolTimeout.new
else
raise DB::Error.new
select
when @availability_channel.receive
when timeout(@checkout_timeout.seconds)
raise DB::PoolTimeout.new("Could not check out a connection in #{@checkout_timeout} seconds")
end
end
private def sync_inc_waiting_resource
sync { @waiting_resource += 1 }
end
private def sync_dec_waiting_resource
sync { @waiting_resource -= 1 }
end
private def are_waiting_for_resource?
@waiting_resource > 0
end
private def sync
@mutex.lock
begin
@ -255,29 +282,5 @@ module DB
@mutex.lock
end
end
class TimeoutHelper
def initialize(@timeout : Float64)
@abort_timeout = false
@timeout_channel = Channel(Nil).new
end
def receive_select_action
@timeout_channel.receive_select_action
end
def start
spawn do
sleep @timeout
unless @abort_timeout
@timeout_channel.send nil
end
end
end
def cancel
@abort_timeout = true
end
end
end
end

View file

@ -4,53 +4,20 @@ module DB
# The execution of the statement is retried according to the pool configuration.
#
# See `PoolStatement`
class PoolPreparedStatement < PoolStatement
# connections where the statement was prepared
@connections = Set(WeakRef(Connection)).new
struct PoolPreparedStatement < PoolStatement
def initialize(db : Database, query : String)
super
# Prepares a statement on some connection
# otherwise the preparation is delayed until the first execution.
# After the first initialization the connection must be released
# it will be checked out when executing it.
statement_with_retry &.release_connection
# TODO use a round-robin selection in the pool so multiple sequentially
# initialized statements are assigned to different connections.
end
protected def do_close
# TODO close all statements on all connections.
# currently statements are closed when the connection is closed.
# WHAT-IF the connection is busy? Should each statement be able to
# deallocate itself when the connection is free.
@connections.clear
end
# builds a statement over a real connection
# the connection is registered in `@connections`
private def build_statement : Statement
clean_connections
conn, existing = @db.checkout_some(@connections)
conn = @db.pool.checkout
begin
stmt = conn.prepared.build(@query)
conn.prepared.build(@query)
rescue ex
conn.release
raise ex
end
@connections << WeakRef.new(conn) unless existing
stmt
end
private def clean_connections
# remove disposed or closed connections
@connections.each do |ref|
conn = ref.value
if !conn || conn.closed?
@connections.delete ref
end
end
end
end
end

View file

@ -3,7 +3,7 @@ module DB
# a statement from the DB needs to be able to represent a statement in any
# of the connections of the pool. Otherwise the user will need to deal with
# actual connections in some point.
abstract class PoolStatement
abstract struct PoolStatement
include StatementMethods
def initialize(@db : Database, @query : String)
@ -15,7 +15,7 @@ module DB
end
# See `QueryMethods#exec`
def exec(*args_, args : Array? = nil) : ExecResult
def exec(*args_, args : Enumerable? = nil) : ExecResult
statement_with_retry &.exec(*args_, args: args)
end
@ -25,12 +25,12 @@ module DB
end
# See `QueryMethods#query`
def query(*args_, args : Array? = nil) : ResultSet
def query(*args_, args : Enumerable? = nil) : ResultSet
statement_with_retry &.query(*args_, args: args)
end
# See `QueryMethods#scalar`
def scalar(*args_, args : Array? = nil)
def scalar(*args_, args : Enumerable? = nil)
statement_with_retry &.scalar(*args_, args: args)
end

View file

@ -4,15 +4,11 @@ module DB
# The execution of the statement is retried according to the pool configuration.
#
# See `PoolStatement`
class PoolUnpreparedStatement < PoolStatement
struct PoolUnpreparedStatement < PoolStatement
def initialize(db : Database, query : String)
super
end
protected def do_close
# unprepared statements do not need to be release in each connection
end
# builds a statement over a real connection
private def build_statement : Statement
conn = @db.pool.checkout

View file

@ -42,7 +42,7 @@ module DB
# result = db.query "select name from contacts where id = ?", args: [10]
# ```
#
def query(query, *args_, args : Array? = nil)
def query(query, *args_, args : Enumerable? = nil)
build(query).query(*args_, args: args)
end
@ -56,7 +56,7 @@ module DB
# end
# end
# ```
def query(query, *args_, args : Array? = nil)
def query(query, *args_, args : Enumerable? = nil)
# CHECK build(query).query(*args, &block)
rs = query(query, *args_, args: args)
yield rs ensure rs.close
@ -67,14 +67,15 @@ module DB
#
# The given block must not invoke `move_next` on the yielded result set.
#
# Raises `DB::Error` if there were no rows, or if there were more than one row.
# Raises `DB::NoResultsError` if there were no rows.
# Raises `DB::Error` if there were more than one row.
#
# ```
# name = db.query_one "select name from contacts where id = ?", 18, &.read(String)
# ```
def query_one(query, *args_, args : Array? = nil, &block : ResultSet -> U) : U forall U
def query_one(query, *args_, args : Enumerable? = nil, &block : ResultSet -> U) : U forall U
query(query, *args_, args: args) do |rs|
raise DB::Error.new("no rows") unless rs.move_next
raise DB::NoResultsError.new("no results") unless rs.move_next
value = yield rs
raise DB::Error.new("more than one row") if rs.move_next
@ -85,12 +86,13 @@ module DB
# Executes a *query* that expects a single row and returns it
# as a tuple of the given *types*.
#
# Raises `DB::Error` if there were no rows, or if there were more than one row.
# Raises `DB::NoResultsError` if there were no rows.
# Raises `DB::Error` if there were more than one row.
#
# ```
# db.query_one "select name, age from contacts where id = ?", 1, as: {String, Int32}
# ```
def query_one(query, *args_, args : Array? = nil, as types : Tuple)
def query_one(query, *args_, args : Enumerable? = nil, as types : Tuple)
query_one(query, *args_, args: args) do |rs|
rs.read(*types)
end
@ -100,12 +102,13 @@ module DB
# as a named tuple of the given *types* (the keys of the named tuple
# are not necessarily the column names).
#
# Raises `DB::Error` if there were no rows, or if there were more than one row.
# Raises `DB::NoResultsError` if there were no rows.
# Raises `DB::Error` if there were more than one row.
#
# ```
# db.query_one "select name, age from contacts where id = ?", 1, as: {name: String, age: Int32}
# ```
def query_one(query, *args_, args : Array? = nil, as types : NamedTuple)
def query_one(query, *args_, args : Enumerable? = nil, as types : NamedTuple)
query_one(query, *args_, args: args) do |rs|
rs.read(**types)
end
@ -114,12 +117,13 @@ module DB
# Executes a *query* that expects a single row
# and returns the first column's value as the given *type*.
#
# Raises `DB::Error` if there were no rows, or if there were more than one row.
# Raises `DB::NoResultsError` if there were no rows.
# Raises `DB::Error` if there were more than one row.
#
# ```
# db.query_one "select name from contacts where id = ?", 1, as: String
# ```
def query_one(query, *args_, args : Array? = nil, as type : Class)
def query_one(query, *args_, args : Enumerable? = nil, as type : Class)
query_one(query, *args_, args: args) do |rs|
rs.read(type)
end
@ -137,7 +141,7 @@ module DB
# name = db.query_one? "select name from contacts where id = ?", 18, &.read(String)
# typeof(name) # => String | Nil
# ```
def query_one?(query, *args_, args : Array? = nil, &block : ResultSet -> U) : U? forall U
def query_one?(query, *args_, args : Enumerable? = nil, &block : ResultSet -> U) : U? forall U
query(query, *args_, args: args) do |rs|
return nil unless rs.move_next
@ -158,7 +162,7 @@ module DB
# result = db.query_one? "select name, age from contacts where id = ?", 1, as: {String, Int32}
# typeof(result) # => Tuple(String, Int32) | Nil
# ```
def query_one?(query, *args_, args : Array? = nil, as types : Tuple)
def query_one?(query, *args_, args : Enumerable? = nil, as types : Tuple)
query_one?(query, *args_, args: args) do |rs|
rs.read(*types)
end
@ -176,7 +180,7 @@ module DB
# result = db.query_one? "select name, age from contacts where id = ?", 1, as: {age: String, name: Int32}
# typeof(result) # => NamedTuple(age: String, name: Int32) | Nil
# ```
def query_one?(query, *args_, args : Array? = nil, as types : NamedTuple)
def query_one?(query, *args_, args : Enumerable? = nil, as types : NamedTuple)
query_one?(query, *args_, args: args) do |rs|
rs.read(**types)
end
@ -193,7 +197,7 @@ module DB
# name = db.query_one? "select name from contacts where id = ?", 1, as: String
# typeof(name) # => String?
# ```
def query_one?(query, *args_, args : Array? = nil, as type : Class)
def query_one?(query, *args_, args : Enumerable? = nil, as type : Class)
query_one?(query, *args_, args: args) do |rs|
rs.read(type)
end
@ -205,7 +209,7 @@ module DB
# ```
# names = db.query_all "select name from contacts", &.read(String)
# ```
def query_all(query, *args_, args : Array? = nil, &block : ResultSet -> U) : Array(U) forall U
def query_all(query, *args_, args : Enumerable? = nil, &block : ResultSet -> U) : Array(U) forall U
ary = [] of U
query_each(query, *args_, args: args) do |rs|
ary.push(yield rs)
@ -219,7 +223,7 @@ module DB
# ```
# contacts = db.query_all "select name, age from contacts", as: {String, Int32}
# ```
def query_all(query, *args_, args : Array? = nil, as types : Tuple)
def query_all(query, *args_, args : Enumerable? = nil, as types : Tuple)
query_all(query, *args_, args: args) do |rs|
rs.read(*types)
end
@ -232,7 +236,7 @@ module DB
# ```
# contacts = db.query_all "select name, age from contacts", as: {name: String, age: Int32}
# ```
def query_all(query, *args_, args : Array? = nil, as types : NamedTuple)
def query_all(query, *args_, args : Enumerable? = nil, as types : NamedTuple)
query_all(query, *args_, args: args) do |rs|
rs.read(**types)
end
@ -244,7 +248,7 @@ module DB
# ```
# names = db.query_all "select name from contacts", as: String
# ```
def query_all(query, *args_, args : Array? = nil, as type : Class)
def query_all(query, *args_, args : Enumerable? = nil, as type : Class)
query_all(query, *args_, args: args) do |rs|
rs.read(type)
end
@ -258,7 +262,7 @@ module DB
# puts rs.read(String)
# end
# ```
def query_each(query, *args_, args : Array? = nil)
def query_each(query, *args_, args : Enumerable? = nil)
query(query, *args_, args: args) do |rs|
rs.each do
yield rs
@ -267,7 +271,7 @@ module DB
end
# Performs the `query` and returns an `ExecResult`
def exec(query, *args_, args : Array? = nil)
def exec(query, *args_, args : Enumerable? = nil)
build(query).exec(*args_, args: args)
end
@ -276,7 +280,7 @@ module DB
# ```
# puts db.scalar("SELECT MAX(name)").as(String) # => (a String)
# ```
def scalar(query, *args_, args : Array? = nil)
def scalar(query, *args_, args : Enumerable? = nil)
build(query).scalar(*args_, args: args)
end
end

View file

@ -29,7 +29,7 @@ module DB
end
protected def do_close
statement.release_connection
statement.release_from_result_set
end
# TODO add_next_result_set : Bool
@ -69,6 +69,11 @@ module DB
# Reads the next column value
abstract def read
# Returns the column index that corresponds to the next `#read`.
#
# If the last column of the current row has been read, it must return `#column_count`.
abstract def next_column_index : Int32
# Reads the next columns and maps them to a class
def read(type : DB::Mappable.class)
type.new(self)
@ -76,14 +81,38 @@ module DB
# Reads the next column value as a **type**
def read(type : T.class) : T forall T
col_index = next_column_index
value = read
if value.is_a?(T)
value
else
raise "#{self.class}#read returned a #{value.class}. A #{T} was expected."
raise DB::ColumnTypeMismatchError.new(
context: "#{self.class}#read",
column_index: col_index,
column_name: column_name(col_index),
column_type: value.class.to_s,
expected_type: T.to_s
)
end
end
# Read the value based on the given `enum` type, supporting both string and
# numeric column types.
#
# ```
# enum Status
# Pending
# Complete
# end
#
# db.query "SELECT 'complete'" do |rs|
# rs.read Status # => Status::Complete
# end
# ```
def read(type : Enum.class)
type.new(self)
end
# Reads the next columns and returns a tuple of the values.
def read(*types : Class)
internal_read(*types)
@ -123,3 +152,24 @@ module DB
# end
end
end
struct Enum
def self.new(rs : DB::ResultSet) : self
index = rs.next_column_index
case value = rs.read
when String
parse value
when Int
from_value value
else
raise DB::ColumnTypeMismatchError.new(
context: "#{self}.new(rs : DB::ResultSet)",
column_index: index,
column_name: rs.column_name(index),
column_type: value.class.to_s,
expected_type: "String | Int",
)
end
end
end

View file

@ -9,7 +9,7 @@ module DB
#
# ### Example
#
# ```crystal
# ```
# require "db"
#
# class Employee
@ -32,7 +32,7 @@ module DB
# Similar to `JSON::Field`, there is an annotation `DB::Field` that can be used to set serialization behavior
# on individual instance variables.
#
# ```crystal
# ```
# class Employee
# include DB::Serializable
#
@ -52,7 +52,7 @@ module DB
#
# Including this module is functionally identical to passing `{strict: false}` to `DB.mapping`: extra columns will not raise.
#
# ```crystal
# ```
# class Employee
# include DB::Serializable
# include DB::Serializable::NonStrict
@ -95,7 +95,7 @@ module DB
super
end
def self.from_rs(rs : ::DB::Result_set)
def self.from_rs(rs : ::DB::ResultSet)
super
end
end
@ -129,14 +129,18 @@ module DB
{% for name, value in properties %}
when {{value[:key]}}
%found{name} = true
%var{name} =
{% if value[:converter] %}
{{value[:converter]}}.from_rs(rs)
{% elsif value[:nilable] || value[:default] != nil %}
rs.read(::Union({{value[:type]}} | Nil))
{% else %}
rs.read({{value[:type]}})
{% end %}
begin
%var{name} =
{% if value[:converter] %}
{{value[:converter]}}.from_rs(rs)
{% elsif value[:nilable] || value[:default] != nil %}
rs.read(::Union({{value[:type]}} | Nil))
{% else %}
rs.read({{value[:type]}})
{% end %}
rescue exc
::raise ::DB::MappingException.new(exc.message, self.class.to_s, {{name.stringify}}, cause: exc)
end
{% end %}
else
rs.read # Advance set, but discard result
@ -146,8 +150,8 @@ module DB
{% for key, value in properties %}
{% unless value[:nilable] || value[:default] != nil %}
if %var{key}.is_a?(Nil) && !%found{key}
raise ::DB::MappingException.new("missing result set attribute: {{(value[:key] || key).id}}")
if %var{key}.nil? && !%found{key}
::raise ::DB::MappingException.new("Missing column {{value[:key].id}}", self.class.to_s, {{key.stringify}})
end
{% end %}
{% end %}
@ -169,7 +173,7 @@ module DB
end
protected def on_unknown_db_column(col_name)
raise ::DB::MappingException.new("unknown result set attribute: #{col_name}")
::raise ::DB::MappingException.new("Unknown column: #{col_name}", self.class.to_s)
end
module NonStrict

View file

@ -14,13 +14,27 @@ module DB
# be prepared or not.
abstract def prepared_statements? : Bool
abstract def prepared_statements_cache? : Bool
abstract def fetch_or_build_prepared_statement(query) : Stmt
abstract def build_unprepared_statement(query) : Stmt
def build(query) : Stmt
if prepared_statements?
fetch_or_build_prepared_statement(query)
stmt = fetch_or_build_prepared_statement(query)
# #build is a :nodoc: method used on QueryMethods where
# the statements are not exposed. As such if the cache
# is disabled we should auto_close the statement.
# When the statements are build explicitly the #prepared
# and #unprepared methods are used. In that case the
# statement is closed by the user explicitly also.
if !prepared_statements_cache?
stmt.auto_close = true if stmt.responds_to?(:auto_close=)
end
stmt
else
build_unprepared_statement(query)
end

View file

@ -2,24 +2,19 @@ module DB
# Common interface for connection based statements
# and for connection pool statements.
module StatementMethods
include Disposable
protected def do_close
end
# See `QueryMethods#scalar`
def scalar(*args_, args : Array? = nil)
def scalar(*args_, args : Enumerable? = nil)
query(*args_, args: args) do |rs|
rs.each do
return rs.read
end
end
raise "no results"
raise NoResultsError.new("no results")
end
# See `QueryMethods#query`
def query(*args_, args : Array? = nil)
def query(*args_, args : Enumerable? = nil)
rs = query(*args_, args: args)
yield rs ensure rs.close
end
@ -27,12 +22,12 @@ module DB
# See `QueryMethods#exec`
abstract def exec : ExecResult
# See `QueryMethods#exec`
abstract def exec(*args_, args : Array? = nil) : ExecResult
abstract def exec(*args_, args : Enumerable? = nil) : ExecResult
# See `QueryMethods#query`
abstract def query : ResultSet
# See `QueryMethods#query`
abstract def query(*args_, args : Array? = nil) : ResultSet
abstract def query(*args_, args : Enumerable? = nil) : ResultSet
end
# Represents a query in a `Connection`.
@ -47,11 +42,26 @@ module DB
# 6. `#do_close` is called to release the statement resources.
abstract class Statement
include StatementMethods
include Disposable
protected def do_close
end
# :nodoc:
getter connection
def initialize(@connection : Connection)
getter command : String
def initialize(@connection : Connection, @command : String)
end
# :nodoc:
property auto_close : Bool = false
# :nodoc:
def release_from_result_set
self.close if @auto_close
self.release_connection
end
def release_connection
@ -64,7 +74,7 @@ module DB
end
# See `QueryMethods#exec`
def exec(*args_, args : Array? = nil) : DB::ExecResult
def exec(*args_, args : Enumerable? = nil) : DB::ExecResult
perform_exec_and_release(EnumerableConcat.build(args_, args))
end
@ -74,18 +84,22 @@ module DB
end
# See `QueryMethods#query`
def query(*args_, args : Array? = nil) : DB::ResultSet
def query(*args_, args : Enumerable? = nil) : DB::ResultSet
perform_query_with_rescue(EnumerableConcat.build(args_, args))
end
private def perform_exec_and_release(args : Enumerable) : ExecResult
return perform_exec(args)
around_query_or_exec(args) do
perform_exec(args)
end
ensure
release_connection
end
private def perform_query_with_rescue(args : Enumerable) : ResultSet
return perform_query(args)
around_query_or_exec(args) do
perform_query(args)
end
rescue e : Exception
# Release connection only when an exception occurs during the query
# execution since we need the connection open while the ResultSet is open
@ -95,5 +109,90 @@ module DB
protected abstract def perform_query(args : Enumerable) : ResultSet
protected abstract def perform_exec(args : Enumerable) : ExecResult
# This method is called when executing the statement. Although it can be
# redefined, it is recommended to use the `def_around_query_or_exec` macro
# to be able to add new behaviors without loosing prior existing ones.
protected def around_query_or_exec(args : Enumerable)
yield
end
# This macro allows injecting code to be run before and after the execution
# of the request. It should return the yielded value. It must be called with 1
# block argument that will be used to pass the `args : Enumerable`.
#
# ```
# class DB::Statement
# def_around_query_or_exec do |args|
# # do something before query or exec
# res = yield
# # do something after query or exec
# res
# end
# end
# ```
macro def_around_query_or_exec(&block)
protected def around_query_or_exec(%args : Enumerable)
previous_def do
{% if block.args.size != 1 %}
{% raise "Wrong number of block arguments (given #{block.args.size}, expected: 1)" %}
{% end %}
{{ block.args.first.id }} = %args
{{ block.body }}
end
end
end
def_around_query_or_exec do |args|
emit_log(args)
yield
end
protected def emit_log(args : Enumerable)
Log.debug &.emit("Executing query", query: command, args: MetadataValueConverter.arg_to_log(args))
end
end
# This module converts DB supported values to `::Log::Metadata::Value`
#
# ### Note to implementors
#
# If the driver defines custom types to be used as arguments the default behavior
# will be converting the value via `#to_s`. Otherwise you can define overloads to
# change this behaviour.
#
# ```
# module DB::MetadataValueConverter
# def self.arg_to_log(arg : PG::Geo::Point)
# ::Log::Metadata::Value.new("(#{arg.x}, #{arg.y})::point")
# end
# end
# ```
module MetadataValueConverter
# Returns *arg* encoded as a `::Log::Metadata::Value`.
def self.arg_to_log(arg) : ::Log::Metadata::Value
::Log::Metadata::Value.new(arg.to_s)
end
# :ditto:
def self.arg_to_log(arg : Enumerable) : ::Log::Metadata::Value
::Log::Metadata::Value.new(arg.to_a.map { |a| arg_to_log(a).as(::Log::Metadata::Value) })
end
# :ditto:
def self.arg_to_log(arg : Int) : ::Log::Metadata::Value
::Log::Metadata::Value.new(arg.to_i64)
end
# :ditto:
def self.arg_to_log(arg : UInt64) : ::Log::Metadata::Value
::Log::Metadata::Value.new(arg.to_s)
end
# :ditto:
def self.arg_to_log(arg : Nil | Bool | Int32 | Int64 | Float32 | Float64 | String | Time) : ::Log::Metadata::Value
::Log::Metadata::Value.new(arg)
end
end
end

View file

@ -1,3 +1,3 @@
module DB
VERSION = "0.8.0"
VERSION = "0.13.1"
end

View file

@ -289,6 +289,62 @@ module DB
ages.should eq([10, 20, 30])
end
it "next_column_index" do |db|
db.exec sql_create_table_person
db.exec sql_insert_person, "foo", 10
db.exec sql_insert_person, "bar", 20
db.query sql_select_person do |rs|
rs.move_next
rs.next_column_index.should eq(0)
rs.read(String)
rs.next_column_index.should eq(1)
rs.read(Int32)
rs.next_column_index.should eq(2)
rs.move_next
rs.next_column_index.should eq(0)
rs.read(String)
rs.next_column_index.should eq(1)
rs.read(Int32)
rs.next_column_index.should eq(2)
end
end
it "next_column_index when ColumnTypeMismatchError" do |db|
db.exec sql_create_table_person
db.exec sql_insert_person, "foo", 10
db.exec sql_insert_person, "bar", 20
db.query sql_select_person do |rs|
rs.move_next
rs.next_column_index.should eq(0)
ex = expect_raises(ColumnTypeMismatchError) { rs.read(Int32) }
ex.column_index.should eq(0)
ex.column_name.should eq("name")
# NOTE: sqlite currently returns Int64 due to how Int32 is implemented
ex.column_type.should match(/String/)
# NOTE: pg currently returns Slice(UInt8) | String due to how String is implemented
ex.expected_type.should match(/Int/)
rs.next_column_index.should eq(1)
ex = expect_raises(ColumnTypeMismatchError) { rs.read(String) }
ex.column_index.should eq(1)
ex.column_name.should eq("age")
# NOTE: sqlite returns Int64
ex.column_type.should match(/Int/)
# NOTE: pg currently returns Slice(UInt8) | String due to how String is implemented
ex.expected_type.should match(/String/)
rs.next_column_index.should eq(2)
rs.move_next
rs.next_column_index.should eq(0)
expect_raises(ColumnTypeMismatchError) { rs.read(Int32) }
rs.next_column_index.should eq(1)
expect_raises(ColumnTypeMismatchError) { rs.read(String) }
rs.next_column_index.should eq(2)
end
end
# describe "transactions" do
it "transactions: can read inside transaction and rollback after" do |db|
db.exec sql_create_table_person
@ -340,7 +396,20 @@ module DB
# :nodoc:
def with_db(options = nil)
@before.call
DB.open("#{connection_string}#{"?#{options}" if options}") do |db|
if options
{% if compare_versions(Crystal::VERSION, "1.9.0") >= 0 %}
uri = URI.parse connection_string
uri.query_params.merge! URI::Params.parse(options)
connection_string_with_options = uri.to_s
{% else %}
raise "Crystal 1.9.0 or greater is required to run with_db with options"
{% end %}
else
connection_string_with_options = connection_string
end
DB.open(connection_string_with_options) do |db|
db.exec(sql_drop_table("table1"))
db.exec(sql_drop_table("table2"))
db.exec(sql_drop_table("person"))
@ -467,7 +536,7 @@ module DB
def self.run(description = "as a db")
ctx = self.new
with ctx yield
with ctx yield ctx
describe description do
ctx.include_shared_specs
@ -496,15 +565,19 @@ module DB
end
end
else
values.each do |prepared_statements|
it("#{db_it.description} (prepared_statements=#{prepared_statements})", db_it.file, db_it.line, db_it.end_line) do
ctx.with_db "prepared_statements=#{prepared_statements}" do |db|
db_it.block.call db
nil
{% if compare_versions(Crystal::VERSION, "1.9.0") >= 0 %}
values.each do |prepared_statements|
it("#{db_it.description} (prepared_statements=#{prepared_statements})", db_it.file, db_it.line, db_it.end_line) do
ctx.with_db "prepared_statements=#{prepared_statements}" do |db|
db_it.block.call db
nil
end
end
end
end
{% end %}
end
else
raise "Invalid prepared value. Allowed values are :both and :default"
end
end
end