410 lines
No EOL
11 KiB
TypeScript
410 lines
No EOL
11 KiB
TypeScript
import nodePath from 'node:path';
|
|
import fs from 'node:fs';
|
|
import { strict as assert } from "node:assert";
|
|
import { ZipFS } from "./zipFs.ts";
|
|
import { globSync } from "glob";
|
|
import { $, ProcessPromise, quote } from "zx";
|
|
|
|
type FSImpl = {
|
|
isZip?: boolean;
|
|
zipPath?: string;
|
|
init?(): Promise<void>;
|
|
ready?: boolean;
|
|
|
|
statSync: typeof fs["statSync"];
|
|
existsSync: typeof fs["existsSync"];
|
|
|
|
// Required by glob
|
|
lstatSync: typeof fs["lstatSync"];
|
|
// Needs to include withFileTypes DirEnt variant
|
|
readdir: typeof fs["readdir"];
|
|
readdirSync: typeof fs["readdirSync"];
|
|
readlinkSync: typeof fs["readlinkSync"];
|
|
realpathSync: typeof fs["realpathSync"];
|
|
promises: {
|
|
lstat: typeof fs.promises["lstat"];
|
|
// Needs to include withFileTypes DirEnt
|
|
readdir: typeof fs.promises["readdir"];
|
|
readlink: typeof fs.promises["readlink"];
|
|
realpath: typeof fs.promises["realpath"];
|
|
}
|
|
};
|
|
const defaultFSImpl = fs;
|
|
|
|
function safe(s: string) {
|
|
return s.replace(/[^a-zA-Z0-9_]/g, '_');
|
|
}
|
|
|
|
|
|
interface TaskTargetOp {
|
|
type: "read" | "mid";
|
|
toShell(target: TaskTarget): string;
|
|
clone(): TaskTargetOp;
|
|
}
|
|
class TaskTargetRead implements TaskTargetOp {
|
|
get type(){ return "read" as const; }
|
|
toShell(target: TaskTarget) {
|
|
if (target.fsImpl.isZip) {
|
|
assert(target.fsImpl.zipPath, "Should have a zipPath");
|
|
// We need to be able to do this
|
|
return `7z x ${quote(target.fsImpl.zipPath)} -so ${quote(target.path)}`;
|
|
}
|
|
|
|
// TODO : Implement when reading from a zip file
|
|
return `cat ${quote(target.path)}`;
|
|
}
|
|
clone() {
|
|
return new TaskTargetRead();
|
|
}
|
|
}
|
|
|
|
type ValidCmd = string | string[] | ((t: TaskTarget)=>string) | ((t: TaskTarget)=>string[]);
|
|
class TaskTargetCmd implements TaskTargetOp {
|
|
get type(){ return "mid" as const; }
|
|
/**What nodejs spawn() and execFile() take
|
|
* [cmd, ...args]: string[]
|
|
*/
|
|
cmd: ValidCmd;
|
|
static parse(target: TaskTarget, v: string | string[] | ((t: TaskTarget)=>string) | ((t: TaskTarget)=>string[])): string[] {
|
|
if (typeof v === "function") {
|
|
v = v(target);
|
|
}
|
|
if (typeof v === "string") {
|
|
v = v.split(/\s+/);
|
|
}
|
|
return v;
|
|
}
|
|
constructor(cmd: ValidCmd) {
|
|
this.cmd = cmd;
|
|
}
|
|
toShell(target: TaskTarget) {
|
|
const parsedCmd = TaskTargetCmd.parse(target, this.cmd);
|
|
const out = parsedCmd
|
|
.map(c => {
|
|
let sh = c.replace(/\n/g, "")
|
|
return quote(sh);
|
|
});
|
|
|
|
return out.join(" ");
|
|
}
|
|
clone() {
|
|
return new TaskTargetCmd(this.cmd);
|
|
}
|
|
}
|
|
|
|
type ValidId = string | ((t: TaskTarget)=>string);
|
|
export class TaskTarget {
|
|
path: string;
|
|
fsImpl: FSImpl = defaultFSImpl;
|
|
pipeline: TaskTargetOp[];
|
|
idValue: ValidId | undefined;
|
|
postFns: ((t: TaskTarget)=>Promise<void>)[];
|
|
|
|
constructor(path: string){
|
|
this.path = path;
|
|
this.pipeline = [];
|
|
this.postFns = [];
|
|
}
|
|
|
|
exists() {
|
|
return this.fsImpl.existsSync(this.path);
|
|
}
|
|
|
|
_joinPath(path: string) {
|
|
let finalPath = path;
|
|
if (!path.startsWith('/')) {
|
|
finalPath = nodePath.join(this.path, path)
|
|
}
|
|
return finalPath;
|
|
}
|
|
|
|
get basename() {
|
|
return safe(nodePath.basename(this.path));
|
|
}
|
|
basenameN(n: number) {
|
|
return this.path
|
|
.split("/")
|
|
.map(s => safe(s))
|
|
.slice(-n)
|
|
.join("___");
|
|
}
|
|
|
|
get id() {
|
|
assert(this.idValue, `TaskTarget for path "${this.path}" must have an id`);
|
|
if (typeof this.idValue === "function") {
|
|
return safe(this.idValue(this));
|
|
}
|
|
return safe(this.idValue);
|
|
}
|
|
|
|
/**Changes the current directory of the target*/
|
|
cd(path: string): TaskTarget {
|
|
this.path = this._joinPath(path);
|
|
return this;
|
|
}
|
|
/**Unzips the file pointed to by the current TaskTarget*/
|
|
async unzip(): Promise<TaskTarget> {
|
|
const zfs = new ZipFS(this.path);
|
|
await zfs.init();
|
|
this.path = ""; // target is now rooted at the base of its respective zipfs
|
|
this.fsImpl = zfs.getImpl() as any;
|
|
return this;
|
|
}
|
|
|
|
/**Get a glob off of the target*/
|
|
glob(globPath: string): TaskTarget[] {
|
|
globPath = this._joinPath(globPath);
|
|
const items = globSync(globPath, {
|
|
cwd: '/DUMMYCWD',
|
|
fs: this.fsImpl
|
|
});
|
|
const ret = items.map(i => new TaskTarget(i));
|
|
// TODO: This should probably clone()
|
|
ret.forEach(t => t.fsImpl = this.fsImpl); // Should all use the same fsImpl
|
|
return ret;
|
|
}
|
|
|
|
/**Clones the TaskTarget*/
|
|
clone(): TaskTarget {
|
|
const t = new TaskTarget(this.path);
|
|
t.fsImpl = this.fsImpl; // holds no state, just needs same impl
|
|
t.idValue = this.idValue;
|
|
t.postFns = this.postFns.slice();
|
|
t.pipeline = this.pipeline.slice()
|
|
.map(p => p.clone());
|
|
return t;
|
|
}
|
|
|
|
pushToPipeline(v: TaskTargetOp) {
|
|
if (v.type === "read") {
|
|
assert(this.pipeline.length === 0, "A read can only be the first item in a pipeline");
|
|
}
|
|
|
|
this.pipeline.push(v);
|
|
}
|
|
|
|
toShell() {
|
|
const shell = this.pipeline
|
|
.map(p => p.toShell(this))
|
|
.join(" | ")
|
|
return shell;
|
|
}
|
|
|
|
pushPostFn(fn: ((t: TaskTarget)=>Promise<void>)) {
|
|
this.postFns.push(fn);
|
|
}
|
|
|
|
cmd(cmd: ValidCmd) {
|
|
this.pushToPipeline(new TaskTargetCmd(cmd));
|
|
return this;
|
|
}
|
|
read() {
|
|
this.pushToPipeline(new TaskTargetRead());
|
|
return this;
|
|
}
|
|
setId(idValue: ValidId) {
|
|
this.idValue = idValue;
|
|
return this;
|
|
}
|
|
post(fn: any) {
|
|
this.pushPostFn(fn);
|
|
}
|
|
types(
|
|
types: string[]
|
|
) {
|
|
// TODO:
|
|
return this;
|
|
}
|
|
csvSink(
|
|
summarization?: [string, string][]
|
|
) {
|
|
// TODO:
|
|
return this;
|
|
|
|
// Ingest this csv into the database at the given id
|
|
// this.cmd(t=>["sqlite-utils", "insert", "your.db", t.id, "-", "--csv", "--detect-types"]);
|
|
// Add a post processing function for these targets that prints out the summarization
|
|
// stats
|
|
// this.post(async (t: TaskTarget)=>{
|
|
// // We only do the first one so far for the summarization
|
|
// let queryLine: string;
|
|
// let formatFn: (r: any)=>string;
|
|
// const [columnName, type] = summarization?.[0] ?? [undefined, undefined];
|
|
// if (type === "numeric") {
|
|
// queryLine = `min(${columnName}) as lo, max(${columnName}) as hi, count(*) as n`;
|
|
// formatFn = (r: any)=>`${r.n} rows from ${r.lo} to ${r.hi} for ${t.id}`;
|
|
// }
|
|
// else {
|
|
// queryLine = `count(*) as n`;
|
|
// formatFn = (r: any)=>`${r.n} rows for ${t.id}`;
|
|
// }
|
|
|
|
// const cmd = "sqlite-utils";
|
|
// const args = ["query", "your.db", `select ${queryLine} from ${t.id}`]
|
|
// const { stdout, stderr } = await execFile(cmd, args);
|
|
// const results = JSON.parse(stdout);
|
|
// const result = results[0]; // should only be one result in the array for this type of query
|
|
// const logLine = formatFn(result);
|
|
// (t as any).log = logLine;
|
|
// });
|
|
|
|
// return this;
|
|
}
|
|
}
|
|
|
|
export function each(targets: TaskTarget[], fn: (t: TaskTarget)=>void) {
|
|
for (const t of targets) {
|
|
fn(t);
|
|
}
|
|
}
|
|
export function map(targets: TaskTarget[], fn: (t: TaskTarget)=>TaskTarget) {
|
|
const newTargets = [];
|
|
for (const t of targets) {
|
|
newTargets.push(fn(t));
|
|
}
|
|
return newTargets;
|
|
}
|
|
export function cd(targets: TaskTarget[], path: string): TaskTarget[] {
|
|
return targets.map(t => t.clone().cd(path));
|
|
}
|
|
export function glob(targets: TaskTarget[], globPath: string): TaskTarget[] {
|
|
return targets.map(t => t.glob(globPath)).flat();
|
|
}
|
|
export async function unzip(targets: TaskTarget[]): Promise<TaskTarget[]> {
|
|
return Promise.all(targets.map(t => t.unzip()));
|
|
}
|
|
export function read(targets: TaskTarget[]): TaskTarget[] {
|
|
return targets.map(t => t.clone().read())
|
|
}
|
|
export function cmd(targets: TaskTarget[], cmd: ValidCmd): TaskTarget[] {
|
|
return targets.map(t => t.clone().cmd(cmd))
|
|
}
|
|
export function setId(targets: TaskTarget[], id: ValidId): TaskTarget[] {
|
|
return targets.map(t => t.clone().setId(id))
|
|
}
|
|
|
|
/**Verify, anything that fails is skipped and throws an error*/
|
|
export async function verify(targets: TaskTarget[]) {
|
|
const outTargets: TaskTarget[] = [];
|
|
for (const t of targets) {
|
|
// Make sure fsImpl is ready
|
|
if ("ready" in t.fsImpl && !t.fsImpl.ready && t.fsImpl.init) {
|
|
await t.fsImpl.init();
|
|
}
|
|
// TODO: Probably remove or assert as incorrect
|
|
if (t.pipeline.length <= 0) {
|
|
continue; // Tasks with empty pipelines are no-ops, remove
|
|
}
|
|
if (!t.exists()) {
|
|
console.warn(`Missing target ${t.path}`);
|
|
continue;
|
|
}
|
|
|
|
outTargets.push(t);
|
|
}
|
|
return outTargets;
|
|
}
|
|
|
|
/**Writes a manifest for parallel, a TSV where each record is an id + the shell to run
|
|
* @todo Enforce doing a verify before we output?
|
|
*/
|
|
export function getTSVManifest(targets: TaskTarget[]): string {
|
|
let out: string[] = [];
|
|
for (const t of targets) {
|
|
const shell = t.toShell();
|
|
out.push(`${t.id}\t${shell}`);
|
|
}
|
|
|
|
return out.join("\n");
|
|
}
|
|
|
|
export function getTaskManifest(targets: TaskTarget[]): [string, string][] {
|
|
let out: [string, string][] = [];
|
|
for (const t of targets) {
|
|
const shell = t.toShell();
|
|
out.push([t.id, shell] as const);
|
|
}
|
|
|
|
return out;
|
|
}
|
|
|
|
function collectionSwap(a: TaskTargetPipelineHelper, b: TaskTargetPipelineHelper) {
|
|
if (!a.__collection) {
|
|
return;
|
|
}
|
|
|
|
// Remove a, add b
|
|
const collection = a.__collection;
|
|
delete a.__collection;
|
|
collection.delete(a);
|
|
b.__collection = collection;
|
|
collection.add(b);
|
|
}
|
|
|
|
export class TaskTargetPipelineHelper extends Array<TaskTarget> {
|
|
__collection?: Set<TaskTargetPipelineHelper>;
|
|
|
|
static pipeline(t: TaskTarget[]): TaskTargetPipelineHelper {
|
|
if (Object.getPrototypeOf(t) === TaskTargetPipelineHelper.prototype) {
|
|
return t as any; // Already done
|
|
}
|
|
Object.setPrototypeOf(t, TaskTargetPipelineHelper.prototype);
|
|
return t as any;
|
|
}
|
|
|
|
_fn(fn: (t: TaskTarget[])=>TaskTarget[]): TaskTargetPipelineHelper {
|
|
const p = TaskTargetPipelineHelper.pipeline(this);
|
|
const t = fn(p);
|
|
const p2 = TaskTargetPipelineHelper.pipeline(t);
|
|
collectionSwap(p, p2); // Move collection pointer to the new item, ends always end up in the collection
|
|
return p2;
|
|
}
|
|
async _afn(fn: (t: TaskTarget[])=>Promise<TaskTarget[]>): Promise<TaskTargetPipelineHelper> {
|
|
const p = TaskTargetPipelineHelper.pipeline(this);
|
|
const t = await fn(p);
|
|
const p2 = TaskTargetPipelineHelper.pipeline(t);
|
|
collectionSwap(p, p2); // Move collection pointer to the new item, ends always end up in the collection
|
|
return p2;
|
|
}
|
|
|
|
cd(path: string): TaskTargetPipelineHelper {
|
|
return this._fn(t => cd(t, path));
|
|
}
|
|
glob(globPath: string): TaskTargetPipelineHelper {
|
|
return this._fn(t => glob(t, globPath));
|
|
}
|
|
async unzip(): Promise<TaskTargetPipelineHelper> {
|
|
return this._afn(unzip);
|
|
}
|
|
read(): TaskTargetPipelineHelper {
|
|
return this._fn(read);
|
|
}
|
|
cmd(_cmd: ValidCmd): TaskTargetPipelineHelper {
|
|
return this._fn(t => cmd(t, _cmd));
|
|
}
|
|
setId(id: ValidId): TaskTargetPipelineHelper {
|
|
return this._fn(t => setId(t, id));
|
|
}
|
|
|
|
types(...args: any[]) {
|
|
// TODO: no-op
|
|
return this;
|
|
}
|
|
csvSink(...args: any[]) {
|
|
// TODO: no-op
|
|
return this;
|
|
}
|
|
/**
|
|
* @todo Nested versions of this don't currently work, but they could if we
|
|
* turn __collection into an array of collections
|
|
*/
|
|
collect(_c: Set<TaskTargetPipelineHelper>) {
|
|
this.__collection = _c;
|
|
return this;
|
|
}
|
|
}
|
|
|
|
export async function run(target: TaskTarget): Promise<ProcessPromise> {
|
|
const command = target.toShell();
|
|
return await $({ nothrow: true })`bash -c ${command}`;
|
|
} |