* aggregateId is now metadata and it's just aggregate: boolean and uses .id instead * Use csv-parse for tests * Update test snapshots
52 lines
1.7 KiB
TypeScript
52 lines
1.7 KiB
TypeScript
import fs from 'node:fs/promises';
|
|
import fsSync from 'node:fs';
|
|
import { DatabaseSync } from "node:sqlite";
|
|
import { type ProcessOutputAggregate, type RunOutput, TaskTarget, runAll, type ProcessOutputSimple } from "./task.ts";
|
|
import { ProcessOutput } from 'zx';
|
|
|
|
|
|
async function loadCSVTable(
|
|
db: DatabaseSync,
|
|
target: TaskTarget,
|
|
result: ProcessOutput | ProcessOutputAggregate | ProcessOutputSimple
|
|
) {
|
|
const id = target.id;
|
|
const table = id;
|
|
const tmpPath = `/tmp/${id}.csv`;
|
|
// console.log(`Writing ${tmpPath}`);
|
|
const fd = await fs.open(tmpPath, 'w');
|
|
await fs.writeFile(fd, result.stdout, { encoding: 'utf8' });
|
|
await fd.close();
|
|
// console.log(`Loading ${tmpPath} → table ${table}`);
|
|
|
|
db.exec(`CREATE VIRTUAL TABLE temp.intermediate USING csv(filename='${tmpPath}', header);`);
|
|
db.exec(`CREATE TABLE "${table}" AS SELECT * FROM intermediate;`);
|
|
db.exec(`DROP TABLE IF EXISTS intermediate;`);
|
|
return;
|
|
}
|
|
|
|
// TODO: This should really have the same name throughout the codebase?
|
|
export const runPipeline = runAll;
|
|
|
|
/**
|
|
* @param db Must be a DatabaseSync with the csv.so extension enabled
|
|
*/
|
|
export async function loadIntoDb(db: DatabaseSync, runOutput: RunOutput[]) {
|
|
for (const {result, target} of runOutput) {
|
|
await loadCSVTable(db, target, result);
|
|
}
|
|
}
|
|
export function getDefaultDB(): DatabaseSync {
|
|
const db = new DatabaseSync(":memory:", { allowExtension: true });
|
|
db.loadExtension("/home/cobertos/sqlite-files/csv.so")
|
|
db.enableLoadExtension(false);
|
|
return db;
|
|
}
|
|
export async function dumpDBToDisk(db: DatabaseSync, dumpPath: string) {
|
|
if (fsSync.existsSync(dumpPath)) {
|
|
await fs.unlink(dumpPath); // unlink the old
|
|
}
|
|
|
|
// Dump it all to the path specified
|
|
db.exec(`VACUUM main INTO '${dumpPath}'`);
|
|
}
|