487 lines
No EOL
14 KiB
TypeScript
487 lines
No EOL
14 KiB
TypeScript
import nodePath from 'node:path';
|
|
import fs from 'node:fs';
|
|
import { strict as assert } from "node:assert";
|
|
import { ZipFS } from "./zipFs.ts";
|
|
import { $, ProcessOutput, quote } from "zx";
|
|
import { parallel } from "./parallel.ts";
|
|
|
|
$.verbose = false;
|
|
|
|
type FSImpl = {
|
|
isZip?: boolean;
|
|
zipPath?: string;
|
|
init?(): Promise<void>;
|
|
ready?: boolean;
|
|
|
|
globSync: typeof fs["globSync"];
|
|
statSync: typeof fs["statSync"];
|
|
existsSync: typeof fs["existsSync"];
|
|
|
|
// Required by glob
|
|
lstatSync: typeof fs["lstatSync"];
|
|
// Needs to include withFileTypes DirEnt variant
|
|
readdir: typeof fs["readdir"];
|
|
readdirSync: typeof fs["readdirSync"];
|
|
readlinkSync: typeof fs["readlinkSync"];
|
|
realpathSync: typeof fs["realpathSync"];
|
|
promises: {
|
|
lstat: typeof fs.promises["lstat"];
|
|
// Needs to include withFileTypes DirEnt
|
|
readdir: typeof fs.promises["readdir"];
|
|
readlink: typeof fs.promises["readlink"];
|
|
realpath: typeof fs.promises["realpath"];
|
|
}
|
|
};
|
|
const defaultFSImpl = fs;
|
|
|
|
function safe(s: string) {
|
|
return s.replace(/[^a-zA-Z0-9_]/g, '_');
|
|
}
|
|
|
|
|
|
interface TaskTargetOp {
|
|
type: "read" | "mid";
|
|
toShell(target: TaskTarget): string | undefined;
|
|
clone(): TaskTargetOp;
|
|
}
|
|
class TaskTargetRead implements TaskTargetOp {
|
|
get type(){ return "read" as const; }
|
|
toShell(target: TaskTarget) {
|
|
if (target.fsImpl.isZip) {
|
|
// Read the file to stdout from the target inside the zip file
|
|
// This relies on the internals of fsImpl a bit to have the path to
|
|
// the root zip so we can create a command against it
|
|
assert(target.fsImpl.zipPath, "Should have a zipPath");
|
|
return `7z x ${quote(target.fsImpl.zipPath)} -so ${quote(target.path)}`;
|
|
}
|
|
|
|
return `cat ${quote(target.path)}`;
|
|
}
|
|
clone() {
|
|
return new TaskTargetRead();
|
|
}
|
|
}
|
|
|
|
type ValidCmd = string | string[] | ((t: TaskTarget)=>string) | ((t: TaskTarget)=>string[]);
|
|
class TaskTargetCmd implements TaskTargetOp {
|
|
get type(){ return "mid" as const; }
|
|
/**What nodejs spawn() and execFile() take
|
|
* [cmd, ...args]: string[]
|
|
*/
|
|
cmd: ValidCmd;
|
|
static parse(target: TaskTarget, v: string | string[] | ((t: TaskTarget)=>string) | ((t: TaskTarget)=>string[])): string[] {
|
|
if (typeof v === "function") {
|
|
v = v(target);
|
|
}
|
|
if (typeof v === "string") {
|
|
v = v.split(/\s+/);
|
|
}
|
|
return v;
|
|
}
|
|
constructor(cmd: ValidCmd) {
|
|
this.cmd = cmd;
|
|
}
|
|
toShell(target: TaskTarget) {
|
|
const parsedCmd = TaskTargetCmd.parse(target, this.cmd);
|
|
const out = parsedCmd
|
|
.map(c => {
|
|
let sh = c.replace(/\n/g, "")
|
|
return quote(sh);
|
|
});
|
|
|
|
return out.join(" ");
|
|
}
|
|
clone() {
|
|
return new TaskTargetCmd(this.cmd);
|
|
}
|
|
}
|
|
|
|
type ValidId = string | ((t: TaskTarget)=>string);
|
|
export const COLUMN_TYPES = {
|
|
/**A numeric value*/
|
|
"numeric": {},
|
|
/**ISO Datetime*/
|
|
"isodatetime": {},
|
|
/**Urls*/
|
|
"url": {},
|
|
/**Freetext*/
|
|
"text": {},
|
|
/**For anything untyped*/
|
|
"any": {},
|
|
/**The sender/originator of a row (maps to Owner in Timelinize)*/
|
|
"sender": {},
|
|
/**The receiver/recipient of a row (maps to RelSent entity in Timelinize)*/
|
|
"receiver": {},
|
|
/**Latitude coordinate*/
|
|
"lat": {},
|
|
/**Longitude coordinate*/
|
|
"lng": {},
|
|
"TODO": {}
|
|
};
|
|
|
|
/**Column metadata. Just a string into the TYPES*/
|
|
type ColumnMeta = (keyof typeof COLUMN_TYPES | undefined);
|
|
// Make non-optional version of just the metadata values of TaskTarget
|
|
type TaskTargetMeta = Required<Pick<TaskTarget, "idValue" | "perRowDescription" | "perRowTags" | "columnMeta" | "aggregate" | "metaIdValue" | "aggregateColumns">>;
|
|
|
|
export class TaskTarget {
|
|
/**The current path pointed to by this TaskTarget*/
|
|
path: string;
|
|
/**The fsImpl used to access the .path*/
|
|
fsImpl: FSImpl = defaultFSImpl;
|
|
/**The pipeline of things to do to the above path to get an stdout of the output*/
|
|
pipeline: TaskTargetOp[];
|
|
|
|
// == Metadata, user configurable, no good defaults ==
|
|
/**Id of the TaskTarget
|
|
* string - Static id
|
|
* fn returning string - Function can derive the id from a task target even after a glob() and cd() operation
|
|
**/
|
|
idValue?: ValidId;
|
|
/**For every output CSV, this defines a description of that CSV per-row
|
|
* Use the items {0}, {1} to template
|
|
* Example: For a CSV with a row format like ["time", "sender", "sendee", "message"]
|
|
* you might do something like '"{3}" sent from {2} to {1}'
|
|
* */
|
|
perRowDescription?: string;
|
|
/**A CSV of tags that is added to every row of the table (TODO: no template functionality currently)*/
|
|
perRowTags?: string;
|
|
/**Metadata about the columns*/
|
|
columnMeta?: ColumnMeta[];
|
|
/**Whether or not to aggregate to a single task (everything with the id value idValue)*/
|
|
aggregate?: boolean;
|
|
/**Names of the columns to aggregate with*/
|
|
aggregateColumns?: string[];
|
|
/**A metadata TaskTarget for this TaskTarget, if one exists*/
|
|
metaIdValue?: ValidId;
|
|
|
|
constructor(path: string){
|
|
this.path = path;
|
|
this.pipeline = [];
|
|
}
|
|
|
|
exists() {
|
|
return this.fsImpl.existsSync(this.path);
|
|
}
|
|
|
|
_joinPath(path: string) {
|
|
let finalPath = path;
|
|
if (!path.startsWith('/')) {
|
|
finalPath = nodePath.join(this.path, path)
|
|
}
|
|
return finalPath;
|
|
}
|
|
|
|
get basename() {
|
|
return safe(nodePath.basename(this.path));
|
|
}
|
|
basenameN(n: number) {
|
|
return this.path
|
|
.split("/")
|
|
.map(s => safe(s))
|
|
.slice(-n)
|
|
.join("___");
|
|
}
|
|
|
|
get id() {
|
|
assert(this.idValue, `TaskTarget for path "${this.path}" must have an id`);
|
|
if (typeof this.idValue === "function") {
|
|
return safe(this.idValue(this));
|
|
}
|
|
return safe(this.idValue);
|
|
}
|
|
get metaId() {
|
|
if (!this.metaIdValue) {
|
|
return undefined;
|
|
}
|
|
if (typeof this.metaIdValue === "function") {
|
|
return safe(this.metaIdValue(this));
|
|
}
|
|
return safe(this.metaIdValue);
|
|
}
|
|
|
|
/**Changes the current directory of the target*/
|
|
cd(path: string): TaskTarget {
|
|
this.path = this._joinPath(path);
|
|
return this;
|
|
}
|
|
/**Unzips the file pointed to by the current TaskTarget*/
|
|
async unzip(): Promise<TaskTarget> {
|
|
const zfs = new ZipFS(this.path);
|
|
await zfs.init();
|
|
this.path = ""; // target is now rooted at the base of its respective zipfs
|
|
this.fsImpl = zfs.getImpl() as any;
|
|
return this;
|
|
}
|
|
|
|
/**Get a glob off of the target*/
|
|
glob(globPath: string): TaskTarget[] {
|
|
globPath = this._joinPath(globPath);
|
|
const items = this.fsImpl.globSync(globPath);
|
|
const ret = items.map(i => new TaskTarget(i));
|
|
// TODO: This should probably clone()
|
|
ret.forEach(t => t.fsImpl = this.fsImpl); // Should all use the same fsImpl
|
|
return ret;
|
|
}
|
|
|
|
/**Clones the TaskTarget*/
|
|
clone(): TaskTarget {
|
|
const t = new TaskTarget(this.path);
|
|
t.fsImpl = this.fsImpl; // holds no state, just needs same impl
|
|
t.pipeline = this.pipeline.slice()
|
|
.map(p => p.clone());
|
|
// metadata
|
|
t.idValue = this.idValue;
|
|
t.perRowDescription = this.perRowDescription;
|
|
t.perRowTags = this.perRowTags;
|
|
t.columnMeta = this.columnMeta?.slice();
|
|
t.metaIdValue = this.metaIdValue;
|
|
t.aggregate = this.aggregate;
|
|
t.aggregateColumns = this.aggregateColumns?.slice();
|
|
return t;
|
|
}
|
|
|
|
pushToPipeline(v: TaskTargetOp) {
|
|
if (v.type === "read") {
|
|
assert(this.pipeline.length === 0, "A read can only be the first item in a pipeline");
|
|
}
|
|
|
|
this.pipeline.push(v);
|
|
}
|
|
|
|
toShell() {
|
|
const shell = this.pipeline
|
|
.map(p => p.toShell(this))
|
|
.filter(p => !!p) // remove empty strings and undefined
|
|
.join(" | ")
|
|
return shell;
|
|
}
|
|
|
|
cmd(cmd: ValidCmd) {
|
|
this.pushToPipeline(new TaskTargetCmd(cmd));
|
|
return this;
|
|
}
|
|
read() {
|
|
this.pushToPipeline(new TaskTargetRead());
|
|
return this;
|
|
}
|
|
assignMeta(meta: Partial<TaskTargetMeta>) {
|
|
Object.assign(this, {
|
|
...meta,
|
|
// Clone this deeply so no shared object references
|
|
columnMeta: meta.columnMeta?.slice()
|
|
});
|
|
return this;
|
|
}
|
|
}
|
|
|
|
export interface PipelineOp {
|
|
(targets: TaskTarget[]): TaskTarget[] | Promise<TaskTarget[]>;
|
|
}
|
|
|
|
export function cd(path: string): PipelineOp {
|
|
return (targets: TaskTarget[]) => targets.map(t => t.clone().cd(path));
|
|
}
|
|
export function glob(globPath: string): PipelineOp {
|
|
return (targets: TaskTarget[]) => targets.map(t => t.glob(globPath)).flat();
|
|
}
|
|
export function unzip(): PipelineOp {
|
|
return async (targets: TaskTarget[]) => Promise.all(targets.map(t => t.unzip()));
|
|
}
|
|
export function read(): PipelineOp {
|
|
return (targets: TaskTarget[]) => targets.map(t => t.clone().read())
|
|
}
|
|
export function cmd(cmd: ValidCmd): PipelineOp {
|
|
return (targets: TaskTarget[]) => targets.map(t => t.clone().cmd(cmd))
|
|
}
|
|
export function assignMeta(meta: Partial<TaskTargetMeta>): PipelineOp {
|
|
return (targets: TaskTarget[]) => targets.map(t => t.clone().assignMeta(meta))
|
|
}
|
|
|
|
export function each(fn: (t: TaskTarget)=>TaskTarget): PipelineOp {
|
|
return (targets: TaskTarget[])=> targets.map(fn);
|
|
}
|
|
export function pipe(...ops: PipelineOp[]): PipelineOp {
|
|
return async (targets: TaskTarget[]) => {
|
|
for (const op of ops) {
|
|
targets = await op(targets);
|
|
}
|
|
return targets;
|
|
};
|
|
}
|
|
export function branch(...ops: PipelineOp[]): PipelineOp {
|
|
return async (targets: TaskTarget[]) => {
|
|
const targetsArrays = await Promise.all(ops.map(op => op(targets)));
|
|
return targetsArrays.flat();
|
|
};
|
|
}
|
|
export function branchGen(genFn: ()=>Generator<PipelineOp>): PipelineOp {
|
|
const opsToBranch = Array.from(genFn());
|
|
return (targets: TaskTarget[]) => {
|
|
return branch(...opsToBranch)(targets);
|
|
};
|
|
}
|
|
|
|
export async function execPaths(entries: ({path: string, op: PipelineOp })[]) {
|
|
return (await Promise.all(
|
|
// Map every entry path into a TaskTarget and run the PipelineOp with
|
|
// that TaskTarget
|
|
entries
|
|
.map(async ({path,op})=>{
|
|
const targets = [new TaskTarget(path)];
|
|
return await op(targets);
|
|
})
|
|
)).flat();
|
|
}
|
|
|
|
|
|
/**Verify, anything that fails is skipped and throws an error*/
|
|
export async function verify(targets: TaskTarget[]) {
|
|
const outTargets: TaskTarget[] = [];
|
|
for (const t of targets) {
|
|
// Make sure fsImpl is ready
|
|
// TODO: DO NOT PUT THIS IN VERIFY, this should go somewhere in the task building stuff...
|
|
if ("ready" in t.fsImpl && !t.fsImpl.ready && t.fsImpl.init) {
|
|
await t.fsImpl.init();
|
|
}
|
|
// TODO: Probably remove or assert as incorrect
|
|
if (t.pipeline.length <= 0) {
|
|
continue; // Tasks with empty pipelines are no-ops, remove
|
|
}
|
|
if (!t.exists()) {
|
|
console.warn(`Missing target ${t.path}`);
|
|
continue;
|
|
}
|
|
|
|
outTargets.push(t);
|
|
}
|
|
|
|
return outTargets;
|
|
}
|
|
|
|
export interface ProcessOutputAggregate {
|
|
stdout: string;
|
|
stderr: string;
|
|
exitCodes: (number | null)[];
|
|
duration: number;
|
|
ok: boolean;
|
|
}
|
|
export interface ProcessOutputSimple {
|
|
stdout: string;
|
|
stderr: string;
|
|
exitCode: number;
|
|
duration: number;
|
|
ok: boolean;
|
|
}
|
|
|
|
function combineProcessOutputAggregate(poa: ProcessOutputAggregate | undefined, t: TaskTarget, po: ProcessOutput) {
|
|
if (!poa) {
|
|
assert(t.aggregateColumns, "aggregate TaskTarget must have aggregateColumns");
|
|
const headers = t.aggregateColumns.join(",") + "\n";
|
|
return {
|
|
stdout: headers + po.stdout,
|
|
stderr: po.stderr,
|
|
exitCodes: [po.exitCode],
|
|
duration: po.duration,
|
|
ok: po.ok
|
|
};
|
|
}
|
|
|
|
// Comes with a builtin "\n" from jq on stdout and stderr, no need to add
|
|
// a trailing one
|
|
poa.stdout += po.stdout;
|
|
poa.stderr += po.stderr;
|
|
poa.exitCodes.push(po.exitCode);
|
|
poa.duration += po.duration;
|
|
poa.ok &&= po.ok;
|
|
return poa;
|
|
}
|
|
|
|
export interface RunOutput {
|
|
target: TaskTarget,
|
|
result: ProcessOutput | ProcessOutputAggregate | ProcessOutputSimple
|
|
}
|
|
|
|
export async function run(target: TaskTarget): Promise<ProcessOutput> {
|
|
const command = target.toShell();
|
|
return await $({ nothrow: true })`bash -c ${command}`;
|
|
}
|
|
|
|
export async function runAll(targets: TaskTarget[]): Promise<RunOutput[]> {
|
|
const finalTargets = await verify(targets);
|
|
const results = await parallel(finalTargets, run, true);
|
|
|
|
const nonAggregateTargets: TaskTarget[] = finalTargets.filter(t => !t.aggregate);
|
|
const nonAggregateResults: RunOutput[] = [];
|
|
const aggregateResultsMap: Record<string, RunOutput> = {};
|
|
|
|
// == Aggregate tables ==
|
|
// Some TaskTargets have .aggregate: true, which means they should all be combined
|
|
// into a single task with the id of the .id property
|
|
for (const [idx, r] of results.entries()) {
|
|
const t = finalTargets[idx];
|
|
if (!t.aggregate) {
|
|
nonAggregateResults.push({
|
|
target: t,
|
|
result: r
|
|
});
|
|
continue;
|
|
}
|
|
const aggregateId = t.id;
|
|
const prevResult = aggregateResultsMap[aggregateId]?.result;
|
|
aggregateResultsMap[aggregateId] = {
|
|
target: t, // Use target t for metadata, so it will use the last target
|
|
result: combineProcessOutputAggregate(prevResult as (ProcessOutputAggregate | undefined), t, r)
|
|
};
|
|
}
|
|
|
|
// == Metadata table ==
|
|
// Each TaskTarget has things like perRowDescription and other things we want to store
|
|
// and output. this creates a single TaskTarget for all that perTable metadata
|
|
function csvEscape(s: string | undefined) {
|
|
if (s === undefined) {
|
|
return "";
|
|
}
|
|
if (s.includes("\"") || s.includes(",") || s.includes("\n")) {
|
|
return `"${s.replace(/\"/g, "\"\"")}"`;
|
|
}
|
|
return s;
|
|
}
|
|
let metadataCSV = "id,perRowDescription,perRowTags,columnMeta,metaId\n";
|
|
for (const t of nonAggregateTargets) {
|
|
const tableNamePart = t.id;
|
|
const perRowDescriptionPart = t.perRowDescription;
|
|
const perRowTagsPart = t.perRowTags;
|
|
const columnMetaPart = t.columnMeta?.join(",") ?? "";
|
|
const metaIdPart = t.metaId;
|
|
metadataCSV += [
|
|
csvEscape(tableNamePart),
|
|
csvEscape(perRowDescriptionPart),
|
|
csvEscape(perRowTagsPart),
|
|
csvEscape(columnMetaPart),
|
|
csvEscape(metaIdPart)
|
|
].join(",") + "\n";
|
|
}
|
|
// Won't be removed by verify() because we're adding it after that's used
|
|
// TODO: Would be nice to bake this into TaskTarget/verify for tasks that dont point
|
|
// to a real path
|
|
const metadataTarget = new TaskTarget("<none>");
|
|
metadataTarget
|
|
// id, perRowDescription, perRowTags, columnMeta, metaId
|
|
.assignMeta({
|
|
idValue: "base_data_manager_metadata",
|
|
columnMeta: ["any", "any", "any", "any", "any"],
|
|
perRowTags: "internal",
|
|
});
|
|
const metadataResult= {
|
|
stdout: metadataCSV,
|
|
stderr: "",
|
|
exitCode: 0,
|
|
duration: 0, // TODO
|
|
ok: true
|
|
};
|
|
const metadataRunOutput: RunOutput = { target: metadataTarget, result: metadataResult };
|
|
|
|
const aggregateResults: RunOutput[] = Object.values(aggregateResultsMap);
|
|
return aggregateResults.concat(nonAggregateResults).concat(metadataRunOutput);
|
|
} |