import fs from 'node:fs/promises'; import fsSync from 'node:fs'; import { DatabaseSync } from "node:sqlite"; import { type ProcessOutputAggregate, type RunOutput, TaskTarget, runAll, type ProcessOutputSimple } from "./task.ts"; import { ProcessOutput } from 'zx'; async function loadCSVTable( db: DatabaseSync, target: TaskTarget, result: ProcessOutput | ProcessOutputAggregate | ProcessOutputSimple ) { const id = target.id; const table = id; const tmpPath = `/tmp/${id}.csv`; // console.log(`Writing ${tmpPath}`); const fd = await fs.open(tmpPath, 'w'); await fs.writeFile(fd, result.stdout, { encoding: 'utf8' }); await fd.close(); // console.log(`Loading ${tmpPath} → table ${table}`); db.exec(`CREATE VIRTUAL TABLE temp.intermediate USING csv(filename='${tmpPath}', header);`); db.exec(`CREATE TABLE "${table}" AS SELECT * FROM intermediate;`); db.exec(`DROP TABLE IF EXISTS intermediate;`); return; } // TODO: This should really have the same name throughout the codebase? export const runPipeline = runAll; /** * @param db Must be a DatabaseSync with the csv.so extension enabled */ export async function loadIntoDb(db: DatabaseSync, runOutput: RunOutput[]) { for (const {result, target} of runOutput) { await loadCSVTable(db, target, result); } } export function getDefaultDB(): DatabaseSync { const db = new DatabaseSync(":memory:", { allowExtension: true }); db.loadExtension("/home/cobertos/sqlite-files/csv.so") db.enableLoadExtension(false); return db; } export async function dumpDBToDisk(db: DatabaseSync, dumpPath: string) { if (fsSync.existsSync(dumpPath)) { await fs.unlink(dumpPath); // unlink the old } // Dump it all to the path specified db.exec(`VACUUM main INTO '${dumpPath}'`); }