base-data-manager/data-export/task.ts

487 lines
No EOL
14 KiB
TypeScript

import nodePath from 'node:path';
import fs from 'node:fs';
import { strict as assert } from "node:assert";
import { ZipFS } from "./zipFs.ts";
import { $, ProcessOutput, quote } from "zx";
import { parallel } from "./parallel.ts";
$.verbose = false;
type FSImpl = {
isZip?: boolean;
zipPath?: string;
init?(): Promise<void>;
ready?: boolean;
globSync: typeof fs["globSync"];
statSync: typeof fs["statSync"];
existsSync: typeof fs["existsSync"];
// Required by glob
lstatSync: typeof fs["lstatSync"];
// Needs to include withFileTypes DirEnt variant
readdir: typeof fs["readdir"];
readdirSync: typeof fs["readdirSync"];
readlinkSync: typeof fs["readlinkSync"];
realpathSync: typeof fs["realpathSync"];
promises: {
lstat: typeof fs.promises["lstat"];
// Needs to include withFileTypes DirEnt
readdir: typeof fs.promises["readdir"];
readlink: typeof fs.promises["readlink"];
realpath: typeof fs.promises["realpath"];
}
};
const defaultFSImpl = fs;
function safe(s: string) {
return s.replace(/[^a-zA-Z0-9_]/g, '_');
}
interface TaskTargetOp {
type: "read" | "mid";
toShell(target: TaskTarget): string | undefined;
clone(): TaskTargetOp;
}
class TaskTargetRead implements TaskTargetOp {
get type(){ return "read" as const; }
toShell(target: TaskTarget) {
if (target.fsImpl.isZip) {
// Read the file to stdout from the target inside the zip file
// This relies on the internals of fsImpl a bit to have the path to
// the root zip so we can create a command against it
assert(target.fsImpl.zipPath, "Should have a zipPath");
return `7z x ${quote(target.fsImpl.zipPath)} -so ${quote(target.path)}`;
}
return `cat ${quote(target.path)}`;
}
clone() {
return new TaskTargetRead();
}
}
type ValidCmd = string | string[] | ((t: TaskTarget)=>string) | ((t: TaskTarget)=>string[]);
class TaskTargetCmd implements TaskTargetOp {
get type(){ return "mid" as const; }
/**What nodejs spawn() and execFile() take
* [cmd, ...args]: string[]
*/
cmd: ValidCmd;
static parse(target: TaskTarget, v: string | string[] | ((t: TaskTarget)=>string) | ((t: TaskTarget)=>string[])): string[] {
if (typeof v === "function") {
v = v(target);
}
if (typeof v === "string") {
v = v.split(/\s+/);
}
return v;
}
constructor(cmd: ValidCmd) {
this.cmd = cmd;
}
toShell(target: TaskTarget) {
const parsedCmd = TaskTargetCmd.parse(target, this.cmd);
const out = parsedCmd
.map(c => {
let sh = c.replace(/\n/g, "")
return quote(sh);
});
return out.join(" ");
}
clone() {
return new TaskTargetCmd(this.cmd);
}
}
type ValidId = string | ((t: TaskTarget)=>string);
export const COLUMN_TYPES = {
/**A numeric value*/
"numeric": {},
/**ISO Datetime*/
"isodatetime": {},
/**Urls*/
"url": {},
/**Freetext*/
"text": {},
/**For anything untyped*/
"any": {},
/**The sender/originator of a row (maps to Owner in Timelinize)*/
"sender": {},
/**The receiver/recipient of a row (maps to RelSent entity in Timelinize)*/
"receiver": {},
/**Latitude coordinate*/
"lat": {},
/**Longitude coordinate*/
"lng": {},
"TODO": {}
};
/**Column metadata. Just a string into the TYPES*/
type ColumnMeta = (keyof typeof COLUMN_TYPES | undefined);
// Make non-optional version of just the metadata values of TaskTarget
type TaskTargetMeta = Required<Pick<TaskTarget, "idValue" | "perRowDescription" | "perRowTags" | "columnMeta" | "aggregate" | "metaIdValue" | "aggregateColumns">>;
export class TaskTarget {
/**The current path pointed to by this TaskTarget*/
path: string;
/**The fsImpl used to access the .path*/
fsImpl: FSImpl = defaultFSImpl;
/**The pipeline of things to do to the above path to get an stdout of the output*/
pipeline: TaskTargetOp[];
// == Metadata, user configurable, no good defaults ==
/**Id of the TaskTarget
* string - Static id
* fn returning string - Function can derive the id from a task target even after a glob() and cd() operation
**/
idValue?: ValidId;
/**For every output CSV, this defines a description of that CSV per-row
* Use the items {0}, {1} to template
* Example: For a CSV with a row format like ["time", "sender", "sendee", "message"]
* you might do something like '"{3}" sent from {2} to {1}'
* */
perRowDescription?: string;
/**A CSV of tags that is added to every row of the table (TODO: no template functionality currently)*/
perRowTags?: string;
/**Metadata about the columns*/
columnMeta?: ColumnMeta[];
/**Whether or not to aggregate to a single task (everything with the id value idValue)*/
aggregate?: boolean;
/**Names of the columns to aggregate with*/
aggregateColumns?: string[];
/**A metadata TaskTarget for this TaskTarget, if one exists*/
metaIdValue?: ValidId;
constructor(path: string){
this.path = path;
this.pipeline = [];
}
exists() {
return this.fsImpl.existsSync(this.path);
}
_joinPath(path: string) {
let finalPath = path;
if (!path.startsWith('/')) {
finalPath = nodePath.join(this.path, path)
}
return finalPath;
}
get basename() {
return safe(nodePath.basename(this.path));
}
basenameN(n: number) {
return this.path
.split("/")
.map(s => safe(s))
.slice(-n)
.join("___");
}
get id() {
assert(this.idValue, `TaskTarget for path "${this.path}" must have an id`);
if (typeof this.idValue === "function") {
return safe(this.idValue(this));
}
return safe(this.idValue);
}
get metaId() {
if (!this.metaIdValue) {
return undefined;
}
if (typeof this.metaIdValue === "function") {
return safe(this.metaIdValue(this));
}
return safe(this.metaIdValue);
}
/**Changes the current directory of the target*/
cd(path: string): TaskTarget {
this.path = this._joinPath(path);
return this;
}
/**Unzips the file pointed to by the current TaskTarget*/
async unzip(): Promise<TaskTarget> {
const zfs = new ZipFS(this.path);
await zfs.init();
this.path = ""; // target is now rooted at the base of its respective zipfs
this.fsImpl = zfs.getImpl() as any;
return this;
}
/**Get a glob off of the target*/
glob(globPath: string): TaskTarget[] {
globPath = this._joinPath(globPath);
const items = this.fsImpl.globSync(globPath);
const ret = items.map(i => new TaskTarget(i));
// TODO: This should probably clone()
ret.forEach(t => t.fsImpl = this.fsImpl); // Should all use the same fsImpl
return ret;
}
/**Clones the TaskTarget*/
clone(): TaskTarget {
const t = new TaskTarget(this.path);
t.fsImpl = this.fsImpl; // holds no state, just needs same impl
t.pipeline = this.pipeline.slice()
.map(p => p.clone());
// metadata
t.idValue = this.idValue;
t.perRowDescription = this.perRowDescription;
t.perRowTags = this.perRowTags;
t.columnMeta = this.columnMeta?.slice();
t.metaIdValue = this.metaIdValue;
t.aggregate = this.aggregate;
t.aggregateColumns = this.aggregateColumns?.slice();
return t;
}
pushToPipeline(v: TaskTargetOp) {
if (v.type === "read") {
assert(this.pipeline.length === 0, "A read can only be the first item in a pipeline");
}
this.pipeline.push(v);
}
toShell() {
const shell = this.pipeline
.map(p => p.toShell(this))
.filter(p => !!p) // remove empty strings and undefined
.join(" | ")
return shell;
}
cmd(cmd: ValidCmd) {
this.pushToPipeline(new TaskTargetCmd(cmd));
return this;
}
read() {
this.pushToPipeline(new TaskTargetRead());
return this;
}
assignMeta(meta: Partial<TaskTargetMeta>) {
Object.assign(this, {
...meta,
// Clone this deeply so no shared object references
columnMeta: meta.columnMeta?.slice()
});
return this;
}
}
export interface PipelineOp {
(targets: TaskTarget[]): TaskTarget[] | Promise<TaskTarget[]>;
}
export function cd(path: string): PipelineOp {
return (targets: TaskTarget[]) => targets.map(t => t.clone().cd(path));
}
export function glob(globPath: string): PipelineOp {
return (targets: TaskTarget[]) => targets.map(t => t.glob(globPath)).flat();
}
export function unzip(): PipelineOp {
return async (targets: TaskTarget[]) => Promise.all(targets.map(t => t.unzip()));
}
export function read(): PipelineOp {
return (targets: TaskTarget[]) => targets.map(t => t.clone().read())
}
export function cmd(cmd: ValidCmd): PipelineOp {
return (targets: TaskTarget[]) => targets.map(t => t.clone().cmd(cmd))
}
export function assignMeta(meta: Partial<TaskTargetMeta>): PipelineOp {
return (targets: TaskTarget[]) => targets.map(t => t.clone().assignMeta(meta))
}
export function each(fn: (t: TaskTarget)=>TaskTarget): PipelineOp {
return (targets: TaskTarget[])=> targets.map(fn);
}
export function pipe(...ops: PipelineOp[]): PipelineOp {
return async (targets: TaskTarget[]) => {
for (const op of ops) {
targets = await op(targets);
}
return targets;
};
}
export function branch(...ops: PipelineOp[]): PipelineOp {
return async (targets: TaskTarget[]) => {
const targetsArrays = await Promise.all(ops.map(op => op(targets)));
return targetsArrays.flat();
};
}
export function branchGen(genFn: ()=>Generator<PipelineOp>): PipelineOp {
const opsToBranch = Array.from(genFn());
return (targets: TaskTarget[]) => {
return branch(...opsToBranch)(targets);
};
}
export async function execPaths(entries: ({path: string, op: PipelineOp })[]) {
return (await Promise.all(
// Map every entry path into a TaskTarget and run the PipelineOp with
// that TaskTarget
entries
.map(async ({path,op})=>{
const targets = [new TaskTarget(path)];
return await op(targets);
})
)).flat();
}
/**Verify, anything that fails is skipped and throws an error*/
export async function verify(targets: TaskTarget[]) {
const outTargets: TaskTarget[] = [];
for (const t of targets) {
// Make sure fsImpl is ready
// TODO: DO NOT PUT THIS IN VERIFY, this should go somewhere in the task building stuff...
if ("ready" in t.fsImpl && !t.fsImpl.ready && t.fsImpl.init) {
await t.fsImpl.init();
}
// TODO: Probably remove or assert as incorrect
if (t.pipeline.length <= 0) {
continue; // Tasks with empty pipelines are no-ops, remove
}
if (!t.exists()) {
console.warn(`Missing target ${t.path}`);
continue;
}
outTargets.push(t);
}
return outTargets;
}
export interface ProcessOutputAggregate {
stdout: string;
stderr: string;
exitCodes: (number | null)[];
duration: number;
ok: boolean;
}
export interface ProcessOutputSimple {
stdout: string;
stderr: string;
exitCode: number;
duration: number;
ok: boolean;
}
function combineProcessOutputAggregate(poa: ProcessOutputAggregate | undefined, t: TaskTarget, po: ProcessOutput) {
if (!poa) {
assert(t.aggregateColumns, "aggregate TaskTarget must have aggregateColumns");
const headers = t.aggregateColumns.join(",") + "\n";
return {
stdout: headers + po.stdout,
stderr: po.stderr,
exitCodes: [po.exitCode],
duration: po.duration,
ok: po.ok
};
}
// Comes with a builtin "\n" from jq on stdout and stderr, no need to add
// a trailing one
poa.stdout += po.stdout;
poa.stderr += po.stderr;
poa.exitCodes.push(po.exitCode);
poa.duration += po.duration;
poa.ok &&= po.ok;
return poa;
}
export interface RunOutput {
target: TaskTarget,
result: ProcessOutput | ProcessOutputAggregate | ProcessOutputSimple
}
export async function run(target: TaskTarget): Promise<ProcessOutput> {
const command = target.toShell();
return await $({ nothrow: true })`bash -c ${command}`;
}
export async function runAll(targets: TaskTarget[]): Promise<RunOutput[]> {
const finalTargets = await verify(targets);
const results = await parallel(finalTargets, run, true);
const nonAggregateTargets: TaskTarget[] = finalTargets.filter(t => !t.aggregate);
const nonAggregateResults: RunOutput[] = [];
const aggregateResultsMap: Record<string, RunOutput> = {};
// == Aggregate tables ==
// Some TaskTargets have .aggregate: true, which means they should all be combined
// into a single task with the id of the .id property
for (const [idx, r] of results.entries()) {
const t = finalTargets[idx];
if (!t.aggregate) {
nonAggregateResults.push({
target: t,
result: r
});
continue;
}
const aggregateId = t.id;
const prevResult = aggregateResultsMap[aggregateId]?.result;
aggregateResultsMap[aggregateId] = {
target: t, // Use target t for metadata, so it will use the last target
result: combineProcessOutputAggregate(prevResult as (ProcessOutputAggregate | undefined), t, r)
};
}
// == Metadata table ==
// Each TaskTarget has things like perRowDescription and other things we want to store
// and output. this creates a single TaskTarget for all that perTable metadata
function csvEscape(s: string | undefined) {
if (s === undefined) {
return "";
}
if (s.includes("\"") || s.includes(",") || s.includes("\n")) {
return `"${s.replace(/\"/g, "\"\"")}"`;
}
return s;
}
let metadataCSV = "id,perRowDescription,perRowTags,columnMeta,metaId\n";
for (const t of nonAggregateTargets) {
const tableNamePart = t.id;
const perRowDescriptionPart = t.perRowDescription;
const perRowTagsPart = t.perRowTags;
const columnMetaPart = t.columnMeta?.join(",") ?? "";
const metaIdPart = t.metaId;
metadataCSV += [
csvEscape(tableNamePart),
csvEscape(perRowDescriptionPart),
csvEscape(perRowTagsPart),
csvEscape(columnMetaPart),
csvEscape(metaIdPart)
].join(",") + "\n";
}
// Won't be removed by verify() because we're adding it after that's used
// TODO: Would be nice to bake this into TaskTarget/verify for tasks that dont point
// to a real path
const metadataTarget = new TaskTarget("<none>");
metadataTarget
// id, perRowDescription, perRowTags, columnMeta, metaId
.assignMeta({
idValue: "base_data_manager_metadata",
columnMeta: ["any", "any", "any", "any", "any"],
perRowTags: "internal",
});
const metadataResult= {
stdout: metadataCSV,
stderr: "",
exitCode: 0,
duration: 0, // TODO
ok: true
};
const metadataRunOutput: RunOutput = { target: metadataTarget, result: metadataResult };
const aggregateResults: RunOutput[] = Object.values(aggregateResultsMap);
return aggregateResults.concat(nonAggregateResults).concat(metadataRunOutput);
}