import nodePath from 'node:path'; import fs from 'node:fs'; import { strict as assert } from "node:assert"; import { ZipFS } from "./zipFs.ts"; import { $, ProcessOutput, quote } from "zx"; import { parallel } from "./parallel.ts"; $.verbose = false; type FSImpl = { isZip?: boolean; zipPath?: string; init?(): Promise; ready?: boolean; globSync: typeof fs["globSync"]; statSync: typeof fs["statSync"]; existsSync: typeof fs["existsSync"]; // Required by glob lstatSync: typeof fs["lstatSync"]; // Needs to include withFileTypes DirEnt variant readdir: typeof fs["readdir"]; readdirSync: typeof fs["readdirSync"]; readlinkSync: typeof fs["readlinkSync"]; realpathSync: typeof fs["realpathSync"]; promises: { lstat: typeof fs.promises["lstat"]; // Needs to include withFileTypes DirEnt readdir: typeof fs.promises["readdir"]; readlink: typeof fs.promises["readlink"]; realpath: typeof fs.promises["realpath"]; } }; const defaultFSImpl = fs; function safe(s: string) { return s.replace(/[^a-zA-Z0-9_]/g, '_'); } interface TaskTargetOp { type: "read" | "mid"; toShell(target: TaskTarget): string | undefined; clone(): TaskTargetOp; } class TaskTargetRead implements TaskTargetOp { get type(){ return "read" as const; } toShell(target: TaskTarget) { if (target.fsImpl.isZip) { // Read the file to stdout from the target inside the zip file // This relies on the internals of fsImpl a bit to have the path to // the root zip so we can create a command against it assert(target.fsImpl.zipPath, "Should have a zipPath"); return `7z x ${quote(target.fsImpl.zipPath)} -so ${quote(target.path)}`; } return `cat ${quote(target.path)}`; } clone() { return new TaskTargetRead(); } } type ValidCmd = string | string[] | ((t: TaskTarget)=>string) | ((t: TaskTarget)=>string[]); class TaskTargetCmd implements TaskTargetOp { get type(){ return "mid" as const; } /**What nodejs spawn() and execFile() take * [cmd, ...args]: string[] */ cmd: ValidCmd; static parse(target: TaskTarget, v: string | string[] | ((t: TaskTarget)=>string) | ((t: TaskTarget)=>string[])): string[] { if (typeof v === "function") { v = v(target); } if (typeof v === "string") { v = v.split(/\s+/); } return v; } constructor(cmd: ValidCmd) { this.cmd = cmd; } toShell(target: TaskTarget) { const parsedCmd = TaskTargetCmd.parse(target, this.cmd); const out = parsedCmd .map(c => { let sh = c.replace(/\n/g, "") return quote(sh); }); return out.join(" "); } clone() { return new TaskTargetCmd(this.cmd); } } type ValidId = string | ((t: TaskTarget)=>string); export const COLUMN_TYPES = { /**A numeric value*/ "numeric": {}, /**ISO Datetime*/ "isodatetime": {}, /**Urls*/ "url": {}, /**Freetext*/ "text": {}, /**For anything untyped*/ "any": {}, /**The sender/originator of a row (maps to Owner in Timelinize)*/ "sender": {}, /**The receiver/recipient of a row (maps to RelSent entity in Timelinize)*/ "receiver": {}, /**Latitude coordinate*/ "lat": {}, /**Longitude coordinate*/ "lng": {}, "TODO": {} }; /**Column metadata. Just a string into the TYPES*/ type ColumnMeta = (keyof typeof COLUMN_TYPES | undefined); // Make non-optional version of just the metadata values of TaskTarget type TaskTargetMeta = Required>; export class TaskTarget { /**The current path pointed to by this TaskTarget*/ path: string; /**The fsImpl used to access the .path*/ fsImpl: FSImpl = defaultFSImpl; /**The pipeline of things to do to the above path to get an stdout of the output*/ pipeline: TaskTargetOp[]; // == Metadata, user configurable, no good defaults == /**Id of the TaskTarget * string - Static id * fn returning string - Function can derive the id from a task target even after a glob() and cd() operation **/ idValue?: ValidId; /**For every output CSV, this defines a description of that CSV per-row * Use the items {0}, {1} to template * Example: For a CSV with a row format like ["time", "sender", "sendee", "message"] * you might do something like '"{3}" sent from {2} to {1}' * */ perRowDescription?: string; /**A CSV of tags that is added to every row of the table (TODO: no template functionality currently)*/ perRowTags?: string; /**Metadata about the columns*/ columnMeta?: ColumnMeta[]; /**Whether or not to aggregate to a single task (everything with the id value idValue)*/ aggregate?: boolean; /**Names of the columns to aggregate with*/ aggregateColumns?: string[]; /**A metadata TaskTarget for this TaskTarget, if one exists*/ metaIdValue?: ValidId; constructor(path: string){ this.path = path; this.pipeline = []; } exists() { return this.fsImpl.existsSync(this.path); } _joinPath(path: string) { let finalPath = path; if (!path.startsWith('/')) { finalPath = nodePath.join(this.path, path) } return finalPath; } get basename() { return safe(nodePath.basename(this.path)); } basenameN(n: number) { return this.path .split("/") .map(s => safe(s)) .slice(-n) .join("___"); } get id() { assert(this.idValue, `TaskTarget for path "${this.path}" must have an id`); if (typeof this.idValue === "function") { return safe(this.idValue(this)); } return safe(this.idValue); } get metaId() { if (!this.metaIdValue) { return undefined; } if (typeof this.metaIdValue === "function") { return safe(this.metaIdValue(this)); } return safe(this.metaIdValue); } /**Changes the current directory of the target*/ cd(path: string): TaskTarget { this.path = this._joinPath(path); return this; } /**Unzips the file pointed to by the current TaskTarget*/ async unzip(): Promise { const zfs = new ZipFS(this.path); await zfs.init(); this.path = ""; // target is now rooted at the base of its respective zipfs this.fsImpl = zfs.getImpl() as any; return this; } /**Get a glob off of the target*/ glob(globPath: string): TaskTarget[] { globPath = this._joinPath(globPath); const items = this.fsImpl.globSync(globPath); const ret = items.map(i => new TaskTarget(i)); // TODO: This should probably clone() ret.forEach(t => t.fsImpl = this.fsImpl); // Should all use the same fsImpl return ret; } /**Clones the TaskTarget*/ clone(): TaskTarget { const t = new TaskTarget(this.path); t.fsImpl = this.fsImpl; // holds no state, just needs same impl t.pipeline = this.pipeline.slice() .map(p => p.clone()); // metadata t.idValue = this.idValue; t.perRowDescription = this.perRowDescription; t.perRowTags = this.perRowTags; t.columnMeta = this.columnMeta?.slice(); t.metaIdValue = this.metaIdValue; t.aggregate = this.aggregate; t.aggregateColumns = this.aggregateColumns?.slice(); return t; } pushToPipeline(v: TaskTargetOp) { if (v.type === "read") { assert(this.pipeline.length === 0, "A read can only be the first item in a pipeline"); } this.pipeline.push(v); } toShell() { const shell = this.pipeline .map(p => p.toShell(this)) .filter(p => !!p) // remove empty strings and undefined .join(" | ") return shell; } cmd(cmd: ValidCmd) { this.pushToPipeline(new TaskTargetCmd(cmd)); return this; } read() { this.pushToPipeline(new TaskTargetRead()); return this; } assignMeta(meta: Partial) { Object.assign(this, { ...meta, // Clone this deeply so no shared object references columnMeta: meta.columnMeta?.slice() }); return this; } } export interface PipelineOp { (targets: TaskTarget[]): TaskTarget[] | Promise; } export function cd(path: string): PipelineOp { return (targets: TaskTarget[]) => targets.map(t => t.clone().cd(path)); } export function glob(globPath: string): PipelineOp { return (targets: TaskTarget[]) => targets.map(t => t.glob(globPath)).flat(); } export function unzip(): PipelineOp { return async (targets: TaskTarget[]) => Promise.all(targets.map(t => t.unzip())); } export function read(): PipelineOp { return (targets: TaskTarget[]) => targets.map(t => t.clone().read()) } export function cmd(cmd: ValidCmd): PipelineOp { return (targets: TaskTarget[]) => targets.map(t => t.clone().cmd(cmd)) } export function assignMeta(meta: Partial): PipelineOp { return (targets: TaskTarget[]) => targets.map(t => t.clone().assignMeta(meta)) } export function each(fn: (t: TaskTarget)=>TaskTarget): PipelineOp { return (targets: TaskTarget[])=> targets.map(fn); } export function pipe(...ops: PipelineOp[]): PipelineOp { return async (targets: TaskTarget[]) => { for (const op of ops) { targets = await op(targets); } return targets; }; } export function branch(...ops: PipelineOp[]): PipelineOp { return async (targets: TaskTarget[]) => { const targetsArrays = await Promise.all(ops.map(op => op(targets))); return targetsArrays.flat(); }; } export function branchGen(genFn: ()=>Generator): PipelineOp { const opsToBranch = Array.from(genFn()); return (targets: TaskTarget[]) => { return branch(...opsToBranch)(targets); }; } export async function execPaths(entries: ({path: string, op: PipelineOp })[]) { return (await Promise.all( // Map every entry path into a TaskTarget and run the PipelineOp with // that TaskTarget entries .map(async ({path,op})=>{ const targets = [new TaskTarget(path)]; return await op(targets); }) )).flat(); } /**Verify, anything that fails is skipped and throws an error*/ export async function verify(targets: TaskTarget[]) { const outTargets: TaskTarget[] = []; for (const t of targets) { // Make sure fsImpl is ready // TODO: DO NOT PUT THIS IN VERIFY, this should go somewhere in the task building stuff... if ("ready" in t.fsImpl && !t.fsImpl.ready && t.fsImpl.init) { await t.fsImpl.init(); } // TODO: Probably remove or assert as incorrect if (t.pipeline.length <= 0) { continue; // Tasks with empty pipelines are no-ops, remove } if (!t.exists()) { console.warn(`Missing target ${t.path}`); continue; } outTargets.push(t); } return outTargets; } export interface ProcessOutputAggregate { stdout: string; stderr: string; exitCodes: (number | null)[]; duration: number; ok: boolean; } export interface ProcessOutputSimple { stdout: string; stderr: string; exitCode: number; duration: number; ok: boolean; } function combineProcessOutputAggregate(poa: ProcessOutputAggregate | undefined, t: TaskTarget, po: ProcessOutput) { if (!poa) { assert(t.aggregateColumns, "aggregate TaskTarget must have aggregateColumns"); const headers = t.aggregateColumns.join(",") + "\n"; return { stdout: headers + po.stdout, stderr: po.stderr, exitCodes: [po.exitCode], duration: po.duration, ok: po.ok }; } // Comes with a builtin "\n" from jq on stdout and stderr, no need to add // a trailing one poa.stdout += po.stdout; poa.stderr += po.stderr; poa.exitCodes.push(po.exitCode); poa.duration += po.duration; poa.ok &&= po.ok; return poa; } export interface RunOutput { target: TaskTarget, result: ProcessOutput | ProcessOutputAggregate | ProcessOutputSimple } export async function run(target: TaskTarget): Promise { const command = target.toShell(); return await $({ nothrow: true })`bash -c ${command}`; } export async function runAll(targets: TaskTarget[]): Promise { const finalTargets = await verify(targets); const results = await parallel(finalTargets, run, true); const nonAggregateTargets: TaskTarget[] = finalTargets.filter(t => !t.aggregate); const nonAggregateResults: RunOutput[] = []; const aggregateResultsMap: Record = {}; // == Aggregate tables == // Some TaskTargets have .aggregate: true, which means they should all be combined // into a single task with the id of the .id property for (const [idx, r] of results.entries()) { const t = finalTargets[idx]; if (!t.aggregate) { nonAggregateResults.push({ target: t, result: r }); continue; } const aggregateId = t.id; const prevResult = aggregateResultsMap[aggregateId]?.result; aggregateResultsMap[aggregateId] = { target: t, // Use target t for metadata, so it will use the last target result: combineProcessOutputAggregate(prevResult as (ProcessOutputAggregate | undefined), t, r) }; } // == Metadata table == // Each TaskTarget has things like perRowDescription and other things we want to store // and output. this creates a single TaskTarget for all that perTable metadata function csvEscape(s: string | undefined) { if (s === undefined) { return ""; } if (s.includes("\"") || s.includes(",") || s.includes("\n")) { return `"${s.replace(/\"/g, "\"\"")}"`; } return s; } let metadataCSV = "id,perRowDescription,perRowTags,columnMeta,metaId\n"; for (const t of nonAggregateTargets) { const tableNamePart = t.id; const perRowDescriptionPart = t.perRowDescription; const perRowTagsPart = t.perRowTags; const columnMetaPart = t.columnMeta?.join(",") ?? ""; const metaIdPart = t.metaId; metadataCSV += [ csvEscape(tableNamePart), csvEscape(perRowDescriptionPart), csvEscape(perRowTagsPart), csvEscape(columnMetaPart), csvEscape(metaIdPart) ].join(",") + "\n"; } // Won't be removed by verify() because we're adding it after that's used // TODO: Would be nice to bake this into TaskTarget/verify for tasks that dont point // to a real path const metadataTarget = new TaskTarget(""); metadataTarget // id, perRowDescription, perRowTags, columnMeta, metaId .assignMeta({ idValue: "base_data_manager_metadata", columnMeta: ["any", "any", "any", "any", "any"], perRowTags: "internal", }); const metadataResult= { stdout: metadataCSV, stderr: "", exitCode: 0, duration: 0, // TODO ok: true }; const metadataRunOutput: RunOutput = { target: metadataTarget, result: metadataResult }; const aggregateResults: RunOutput[] = Object.values(aggregateResultsMap); return aggregateResults.concat(nonAggregateResults).concat(metadataRunOutput); }