fediglam/src/sql/engines/postgres.zig

251 lines
8.4 KiB
Zig

const std = @import("std");
const util = @import("util");
const common = @import("./common.zig");
const c = @import("./postgres/c.zig");
const errors = @import("./postgres/errors.zig");
const Allocator = std.mem.Allocator;
pub const Results = struct {
result: *c.PGresult,
next_row_index: c_int = 0,
pub fn row(self: *Results) common.RowError!?Row {
if (self.next_row_index >= self.rowCount()) return null;
const idx = self.next_row_index;
self.next_row_index += 1;
return Row{
.result = self.result,
.row_index = idx,
};
}
fn rowCount(self: Results) c_int {
return c.PQntuples(self.result);
}
pub fn columnCount(self: Results) common.ColumnCountError!u15 {
return std.math.cast(u15, c.PQnfields(self.result)) orelse error.OutOfRange;
}
pub fn columnIndex(self: Results, name: []const u8) common.ColumnIndexError!u15 {
const idx = c.PQfnumber(self.result, name.ptr);
if (idx == -1) return error.NotFound;
return std.math.cast(u15, idx) orelse error.OutOfRange;
}
pub fn finish(self: Results) void {
c.PQclear(self.result);
}
};
fn handleError(result: *c.PGresult) common.RowError {
const error_code = c.PQresultErrorField(result, c.PG_DIAG_SQLSTATE);
const state = errors.SqlState.parse(error_code) catch {
std.log.err("Database returned invalid error code {?s}", .{error_code});
return error.Unexpected;
};
const class = state.errorClass();
// TODO: This will crash if a value not defined in Postgres 14 is returned.
// See https://github.com/ziglang/zig/issues/12845
// If this issue does not get accepted we should redo this to use a comptime
// string map or something
std.log.err(
"Database returned error code {s}: Class {s} error {s}",
.{ error_code, @tagName(class), @tagName(state) },
);
return switch (class) {
.triggered_action_exception,
.feature_not_supported,
.invalid_transaction_initiation,
.locator_exception,
.cardinality_violation,
.data_exception,
.invalid_transaction_state,
.invalid_sql_statement_name,
.triggered_data_change_violation,
.dependent_privilege_descriptors_still_exist,
.invalid_transaction_termination,
.sql_routine_exception,
.invalid_cursor_name,
.external_routine_exception,
.external_routine_invocation_exception,
.savepoint_exception,
.invalid_catalog_name,
.invalid_schema_name,
.transaction_rollback, // TODO: consider deadlock avoidance/retry strategy
.snapshot_too_old,
.plpgsql_error,
=> error.SqlException,
.invalid_authorization_specification,
.invalid_grantor,
=> error.PermissionDenied,
.insufficient_resources,
.connection_exception,
.system_error,
.config_file_error,
.internal_error,
=> error.InternalException,
.operator_intervention => switch (state) {
.query_canceled => error.Cancelled,
else => error.InternalException,
},
.object_not_in_prerequisite_state => switch (state) {
.lock_not_available => error.DatabaseBusy,
else => error.SqlException,
},
.syntax_error_or_access_rule_violation => switch (state) {
.insufficient_privilege => error.PermissionDenied,
else => error.SqlException,
},
.with_check_option_violation,
.integrity_constraint_violation,
=> switch (state) {
.not_null_violation => error.NotNullViolation,
.foreign_key_violation => error.ForeignKeyViolation,
.unique_violation => error.UniqueViolation,
.check_violation => error.CheckViolation,
else => error.ConstraintViolation,
},
else => error.Unexpected,
};
}
pub const Row = struct {
result: *c.PGresult,
row_index: c_int,
pub fn get(self: Row, comptime T: type, idx: u16, alloc: ?Allocator) common.GetError!T {
const val = c.PQgetvalue(self.result, self.row_index, idx);
const is_null = (c.PQgetisnull(self.result, self.row_index, idx) != 0);
if (is_null) {
return if (@typeInfo(T) == .Optional) null else error.ResultTypeMismatch;
}
if (val == null) return error.Unexpected;
const len = std.math.cast(
usize,
c.PQgetlength(self.result, self.row_index, idx),
) orelse return error.Unexpected;
return try common.parseValueNotNull(alloc, T, val[0..len]);
}
};
pub const Db = struct {
conn: *c.PGconn,
pub fn open(conn_str: [:0]const u8) common.OpenError!Db {
const conn = c.PQconnectdb(conn_str.ptr) orelse {
std.log.err("Unable to connect to database", .{});
return error.Unexpected;
};
errdefer c.PQfinish(conn);
std.log.info("Connecting to database using provided connection string...", .{});
switch (c.PQstatus(conn)) {
c.CONNECTION_OK => {},
c.CONNECTION_BAD => {
std.log.err("Error connecting to database: {?s}", .{c.PQerrorMessage(conn)});
return error.BadConnection;
},
else => |status| {
std.log.err("Unexpected PQstatus {}: {?s}", .{ status, c.PQerrorMessage(conn) });
return error.Unexpected;
},
}
std.log.info("DB connection established", .{});
return Db{ .conn = conn };
}
pub fn close(self: Db) void {
c.PQfinish(self.conn);
}
const format_text = 0;
const format_binary = 1;
pub fn exec(self: Db, sql: [:0]const u8, args: anytype, opt: common.QueryOptions) common.ExecError!Results {
const alloc = opt.allocator;
const result = blk: {
if (@TypeOf(args) != void and args.len > 0) {
var arena = std.heap.ArenaAllocator.init(alloc orelse return error.AllocatorRequired);
defer arena.deinit();
const params = try arena.allocator().alloc(?[*:0]const u8, args.len);
// TODO: The following is a fix for the stage1 compiler. remove this
//inline for (args) |arg, i| {
inline for (std.meta.fields(@TypeOf(args))) |field, i| {
const arg = @field(args, field.name);
// The stage1 compiler has issues with runtime branches that in any
// way involve compile time values
const maybe_slice = if (@import("builtin").zig_backend == .stage1)
common.prepareParamText(&arena, arg) catch unreachable
else
try common.prepareParamText(&arena, arg);
params[i] = if (maybe_slice) |slice|
slice.ptr
else
null;
}
break :blk c.PQexecParams(
self.conn,
sql.ptr,
@intCast(c_int, params.len),
null,
params.ptr,
null,
null,
format_text,
);
} else {
break :blk c.PQexecParams(self.conn, sql.ptr, 0, null, null, null, null, format_text);
}
} orelse {
std.log.err("Error occurred in sql query: {?s}", .{c.PQerrorMessage(self.conn)});
return error.Unexpected;
};
errdefer c.PQclear(result);
const status = c.PQresultStatus(result);
switch (status) {
c.PGRES_COMMAND_OK,
c.PGRES_TUPLES_OK,
=> return Results{ .result = result },
c.PGRES_EMPTY_QUERY => return error.SqlException,
c.PGRES_BAD_RESPONSE => {
std.log.err("Database returned invalid response: {?s}", .{c.PQresultErrorMessage(result)});
return error.InternalException;
},
c.PGRES_FATAL_ERROR => return handleError(result),
else => |err| {
std.log.err(
"Unexpected PQresultStatus {?s} ({}): {?s}",
.{ c.PQresStatus(err), err, c.PQresultErrorMessage(result) },
);
return error.Unexpected;
},
}
}
};