fediglam/src/sql/postgres.zig

121 lines
3.9 KiB
Zig

const std = @import("std");
const util = @import("util");
const common = @import("./common.zig");
const c = @cImport({
@cInclude("libpq-fe.h");
});
const Allocator = std.mem.Allocator;
pub const Results = struct {
result: *c.PGresult,
next_row_index: c_int = 0,
pub fn rowCount(self: Results) usize {
return @intCast(usize, c.PQntuples(self.result));
}
pub fn row(self: *Results) !?Row {
if (self.next_row_index >= self.rowCount()) return null;
const idx = self.next_row_index;
self.next_row_index += 1;
return Row{
.result = self.result,
.row_index = idx,
};
}
pub fn finish(self: Results) void {
c.PQclear(self.result);
}
};
pub const Row = struct {
result: *c.PGresult,
row_index: c_int,
pub fn get(self: Row, comptime T: type, idx: u16, alloc: ?Allocator) !T {
const val = c.PQgetvalue(self.result, self.row_index, idx);
const is_null = (c.PQgetisnull(self.result, self.row_index, idx) != 0);
if (is_null) {
return if (@typeInfo(T) == .Optional) null else error.NullValue;
}
if (val == null) return error.Unexpected;
const len = @intCast(usize, c.PQgetlength(self.result, self.row_index, idx));
return try common.parseValueNotNull(alloc, T, val[0..len]);
}
};
pub const Db = struct {
conn: *c.PGconn,
pub fn open(conn_str: [:0]const u8) !Db {
const conn = c.PQconnectdb(conn_str.ptr) orelse {
std.log.err("Unable to connect to database", .{});
return error.UnknownError;
};
errdefer c.PQfinish(conn);
std.log.info("Connecting to database using provided connection string...", .{});
loop: while (true) {
switch (c.PQstatus(conn)) {
c.CONNECTION_OK => break :loop,
c.CONNECTION_BAD => {
std.log.err("Error connecting to database", .{});
return error.BadConnection;
},
else => std.os.nanosleep(0, 100 * 1000 * 1000), // 100 ms
}
}
std.log.info("DB connection established", .{});
return Db{ .conn = conn };
}
pub fn close(self: Db) void {
c.PQfinish(self.conn);
}
const format_text = 0;
const format_binary = 1;
pub fn exec(self: Db, sql: [:0]const u8, args: anytype, alloc: Allocator) !Results {
const result = blk: {
var arena = std.heap.ArenaAllocator.init(alloc);
defer arena.deinit();
const params = try arena.allocator().alloc(?[*]const u8, args.len);
inline for (args) |a, i| params[i] = if (try common.prepareParamText(arena, a)) |slice| slice.ptr else null;
break :blk c.PQexecParams(self.conn, sql.ptr, @intCast(c_int, params.len), null, params.ptr, null, null, format_text);
} orelse {
std.log.err("Error occurred in sql query: {?s}", .{c.PQerrorMessage(self.conn)});
return error.UnknownError;
};
errdefer c.PQclear(result);
const status = c.PQresultStatus(result);
return switch (status) {
c.PGRES_EMPTY_QUERY => error.EmptyQuery,
c.PGRES_FATAL_ERROR => blk: {
std.log.err("Error occurred in sql query: {?s}", .{c.PQresultErrorMessage(result)});
break :blk error.UnknownError;
},
c.PGRES_BAD_RESPONSE => blk: {
std.log.err("Error occurred in sql query: {?s}", .{c.PQresultErrorMessage(result)});
break :blk error.BadResponse;
},
c.PGRES_COMMAND_OK, c.PGRES_TUPLES_OK, c.PGRES_COPY_OUT, c.PGRES_COPY_IN, c.PGRES_COPY_BOTH => return Results{ .result = result }, //TODO
c.PGRES_SINGLE_TUPLE => unreachable, // Not yet supported
else => unreachable,
};
}
};