import nodePath from 'node:path'; import fs from 'node:fs'; import { strict as assert } from "node:assert"; import { ZipFS } from "./zipFs.ts"; import { globSync } from "glob"; import { $, ProcessPromise, quote } from "zx"; type FSImpl = { isZip?: boolean; zipPath?: string; init?(): Promise; ready?: boolean; statSync: typeof fs["statSync"]; existsSync: typeof fs["existsSync"]; // Required by glob lstatSync: typeof fs["lstatSync"]; // Needs to include withFileTypes DirEnt variant readdir: typeof fs["readdir"]; readdirSync: typeof fs["readdirSync"]; readlinkSync: typeof fs["readlinkSync"]; realpathSync: typeof fs["realpathSync"]; promises: { lstat: typeof fs.promises["lstat"]; // Needs to include withFileTypes DirEnt readdir: typeof fs.promises["readdir"]; readlink: typeof fs.promises["readlink"]; realpath: typeof fs.promises["realpath"]; } }; const defaultFSImpl = fs; function safe(s: string) { return s.replace(/[^a-zA-Z0-9_]/g, '_'); } interface TaskTargetOp { type: "read" | "mid"; toShell(target: TaskTarget): string; clone(): TaskTargetOp; } class TaskTargetRead implements TaskTargetOp { get type(){ return "read" as const; } toShell(target: TaskTarget) { if (target.fsImpl.isZip) { assert(target.fsImpl.zipPath, "Should have a zipPath"); // We need to be able to do this return `7z x ${quote(target.fsImpl.zipPath)} -so ${quote(target.path)}`; } // TODO : Implement when reading from a zip file return `cat ${quote(target.path)}`; } clone() { return new TaskTargetRead(); } } type ValidCmd = string | string[] | ((t: TaskTarget)=>string) | ((t: TaskTarget)=>string[]); class TaskTargetCmd implements TaskTargetOp { get type(){ return "mid" as const; } /**What nodejs spawn() and execFile() take * [cmd, ...args]: string[] */ cmd: ValidCmd; static parse(target: TaskTarget, v: string | string[] | ((t: TaskTarget)=>string) | ((t: TaskTarget)=>string[])): string[] { if (typeof v === "function") { v = v(target); } if (typeof v === "string") { v = v.split(/\s+/); } return v; } constructor(cmd: ValidCmd) { this.cmd = cmd; } toShell(target: TaskTarget) { const parsedCmd = TaskTargetCmd.parse(target, this.cmd); const out = parsedCmd .map(c => { let sh = c.replace(/\n/g, "") return quote(sh); }); return out.join(" "); } clone() { return new TaskTargetCmd(this.cmd); } } type ValidId = string | ((t: TaskTarget)=>string); export class TaskTarget { path: string; fsImpl: FSImpl = defaultFSImpl; pipeline: TaskTargetOp[]; idValue: ValidId | undefined; postFns: ((t: TaskTarget)=>Promise)[]; constructor(path: string){ this.path = path; this.pipeline = []; this.postFns = []; } exists() { return this.fsImpl.existsSync(this.path); } _joinPath(path: string) { let finalPath = path; if (!path.startsWith('/')) { finalPath = nodePath.join(this.path, path) } return finalPath; } get basename() { return safe(nodePath.basename(this.path)); } basenameN(n: number) { return this.path .split("/") .map(s => safe(s)) .slice(-n) .join("___"); } get id() { assert(this.idValue, `TaskTarget for path "${this.path}" must have an id`); if (typeof this.idValue === "function") { return safe(this.idValue(this)); } return safe(this.idValue); } /**Changes the current directory of the target*/ cd(path: string): TaskTarget { this.path = this._joinPath(path); return this; } /**Unzips the file pointed to by the current TaskTarget*/ async unzip(): Promise { const zfs = new ZipFS(this.path); await zfs.init(); this.path = ""; // target is now rooted at the base of its respective zipfs this.fsImpl = zfs.getImpl() as any; return this; } /**Get a glob off of the target*/ glob(globPath: string): TaskTarget[] { globPath = this._joinPath(globPath); const items = globSync(globPath, { cwd: '/DUMMYCWD', fs: this.fsImpl }); const ret = items.map(i => new TaskTarget(i)); // TODO: This should probably clone() ret.forEach(t => t.fsImpl = this.fsImpl); // Should all use the same fsImpl return ret; } /**Clones the TaskTarget*/ clone(): TaskTarget { const t = new TaskTarget(this.path); t.fsImpl = this.fsImpl; // holds no state, just needs same impl t.idValue = this.idValue; t.postFns = this.postFns.slice(); t.pipeline = this.pipeline.slice() .map(p => p.clone()); return t; } pushToPipeline(v: TaskTargetOp) { if (v.type === "read") { assert(this.pipeline.length === 0, "A read can only be the first item in a pipeline"); } this.pipeline.push(v); } toShell() { const shell = this.pipeline .map(p => p.toShell(this)) .join(" | ") return shell; } pushPostFn(fn: ((t: TaskTarget)=>Promise)) { this.postFns.push(fn); } cmd(cmd: ValidCmd) { this.pushToPipeline(new TaskTargetCmd(cmd)); return this; } read() { this.pushToPipeline(new TaskTargetRead()); return this; } setId(idValue: ValidId) { this.idValue = idValue; return this; } post(fn: any) { this.pushPostFn(fn); } types( types: string[] ) { // TODO: return this; } csvSink( summarization?: [string, string][] ) { // TODO: return this; // Ingest this csv into the database at the given id // this.cmd(t=>["sqlite-utils", "insert", "your.db", t.id, "-", "--csv", "--detect-types"]); // Add a post processing function for these targets that prints out the summarization // stats // this.post(async (t: TaskTarget)=>{ // // We only do the first one so far for the summarization // let queryLine: string; // let formatFn: (r: any)=>string; // const [columnName, type] = summarization?.[0] ?? [undefined, undefined]; // if (type === "numeric") { // queryLine = `min(${columnName}) as lo, max(${columnName}) as hi, count(*) as n`; // formatFn = (r: any)=>`${r.n} rows from ${r.lo} to ${r.hi} for ${t.id}`; // } // else { // queryLine = `count(*) as n`; // formatFn = (r: any)=>`${r.n} rows for ${t.id}`; // } // const cmd = "sqlite-utils"; // const args = ["query", "your.db", `select ${queryLine} from ${t.id}`] // const { stdout, stderr } = await execFile(cmd, args); // const results = JSON.parse(stdout); // const result = results[0]; // should only be one result in the array for this type of query // const logLine = formatFn(result); // (t as any).log = logLine; // }); // return this; } } export function each(targets: TaskTarget[], fn: (t: TaskTarget)=>void) { for (const t of targets) { fn(t); } } export function map(targets: TaskTarget[], fn: (t: TaskTarget)=>TaskTarget) { const newTargets = []; for (const t of targets) { newTargets.push(fn(t)); } return newTargets; } export function cd(targets: TaskTarget[], path: string): TaskTarget[] { return targets.map(t => t.clone().cd(path)); } export function glob(targets: TaskTarget[], globPath: string): TaskTarget[] { return targets.map(t => t.glob(globPath)).flat(); } export async function unzip(targets: TaskTarget[]): Promise { return Promise.all(targets.map(t => t.unzip())); } export function read(targets: TaskTarget[]): TaskTarget[] { return targets.map(t => t.clone().read()) } export function cmd(targets: TaskTarget[], cmd: ValidCmd): TaskTarget[] { return targets.map(t => t.clone().cmd(cmd)) } export function setId(targets: TaskTarget[], id: ValidId): TaskTarget[] { return targets.map(t => t.clone().setId(id)) } /**Verify, anything that fails is skipped and throws an error*/ export async function verify(targets: TaskTarget[]) { const outTargets: TaskTarget[] = []; for (const t of targets) { // Make sure fsImpl is ready if ("ready" in t.fsImpl && !t.fsImpl.ready && t.fsImpl.init) { await t.fsImpl.init(); } // TODO: Probably remove or assert as incorrect if (t.pipeline.length <= 0) { continue; // Tasks with empty pipelines are no-ops, remove } if (!t.exists()) { console.warn(`Missing target ${t.path}`); continue; } outTargets.push(t); } return outTargets; } /**Writes a manifest for parallel, a TSV where each record is an id + the shell to run * @todo Enforce doing a verify before we output? */ export function getTSVManifest(targets: TaskTarget[]): string { let out: string[] = []; for (const t of targets) { const shell = t.toShell(); out.push(`${t.id}\t${shell}`); } return out.join("\n"); } export function getTaskManifest(targets: TaskTarget[]): [string, string][] { let out: [string, string][] = []; for (const t of targets) { const shell = t.toShell(); out.push([t.id, shell] as const); } return out; } function collectionSwap(a: TaskTargetPipelineHelper, b: TaskTargetPipelineHelper) { if (!a.__collection) { return; } // Remove a, add b const collection = a.__collection; delete a.__collection; collection.delete(a); b.__collection = collection; collection.add(b); } export class TaskTargetPipelineHelper extends Array { __collection?: Set; static pipeline(t: TaskTarget[]): TaskTargetPipelineHelper { if (Object.getPrototypeOf(t) === TaskTargetPipelineHelper.prototype) { return t as any; // Already done } Object.setPrototypeOf(t, TaskTargetPipelineHelper.prototype); return t as any; } _fn(fn: (t: TaskTarget[])=>TaskTarget[]): TaskTargetPipelineHelper { const p = TaskTargetPipelineHelper.pipeline(this); const t = fn(p); const p2 = TaskTargetPipelineHelper.pipeline(t); collectionSwap(p, p2); // Move collection pointer to the new item, ends always end up in the collection return p2; } async _afn(fn: (t: TaskTarget[])=>Promise): Promise { const p = TaskTargetPipelineHelper.pipeline(this); const t = await fn(p); const p2 = TaskTargetPipelineHelper.pipeline(t); collectionSwap(p, p2); // Move collection pointer to the new item, ends always end up in the collection return p2; } cd(path: string): TaskTargetPipelineHelper { return this._fn(t => cd(t, path)); } glob(globPath: string): TaskTargetPipelineHelper { return this._fn(t => glob(t, globPath)); } async unzip(): Promise { return this._afn(unzip); } read(): TaskTargetPipelineHelper { return this._fn(read); } cmd(_cmd: ValidCmd): TaskTargetPipelineHelper { return this._fn(t => cmd(t, _cmd)); } setId(id: ValidId): TaskTargetPipelineHelper { return this._fn(t => setId(t, id)); } types(...args: any[]) { // TODO: no-op return this; } csvSink(...args: any[]) { // TODO: no-op return this; } /** * @todo Nested versions of this don't currently work, but they could if we * turn __collection into an array of collections */ collect(_c: Set) { this.__collection = _c; return this; } } export async function run(target: TaskTarget): Promise { const command = target.toShell(); return await $({ nothrow: true })`bash -c ${command}`; }