base-data-manager/data-export/task.ts

410 lines
No EOL
11 KiB
TypeScript

import nodePath from 'node:path';
import fs from 'node:fs';
import { strict as assert } from "node:assert";
import { ZipFS } from "./zipFs.ts";
import { globSync } from "glob";
import { $, ProcessPromise, quote } from "zx";
type FSImpl = {
isZip?: boolean;
zipPath?: string;
init?(): Promise<void>;
ready?: boolean;
statSync: typeof fs["statSync"];
existsSync: typeof fs["existsSync"];
// Required by glob
lstatSync: typeof fs["lstatSync"];
// Needs to include withFileTypes DirEnt variant
readdir: typeof fs["readdir"];
readdirSync: typeof fs["readdirSync"];
readlinkSync: typeof fs["readlinkSync"];
realpathSync: typeof fs["realpathSync"];
promises: {
lstat: typeof fs.promises["lstat"];
// Needs to include withFileTypes DirEnt
readdir: typeof fs.promises["readdir"];
readlink: typeof fs.promises["readlink"];
realpath: typeof fs.promises["realpath"];
}
};
const defaultFSImpl = fs;
function safe(s: string) {
return s.replace(/[^a-zA-Z0-9_]/g, '_');
}
interface TaskTargetOp {
type: "read" | "mid";
toShell(target: TaskTarget): string;
clone(): TaskTargetOp;
}
class TaskTargetRead implements TaskTargetOp {
get type(){ return "read" as const; }
toShell(target: TaskTarget) {
if (target.fsImpl.isZip) {
assert(target.fsImpl.zipPath, "Should have a zipPath");
// We need to be able to do this
return `7z x ${quote(target.fsImpl.zipPath)} -so ${quote(target.path)}`;
}
// TODO : Implement when reading from a zip file
return `cat ${quote(target.path)}`;
}
clone() {
return new TaskTargetRead();
}
}
type ValidCmd = string | string[] | ((t: TaskTarget)=>string) | ((t: TaskTarget)=>string[]);
class TaskTargetCmd implements TaskTargetOp {
get type(){ return "mid" as const; }
/**What nodejs spawn() and execFile() take
* [cmd, ...args]: string[]
*/
cmd: ValidCmd;
static parse(target: TaskTarget, v: string | string[] | ((t: TaskTarget)=>string) | ((t: TaskTarget)=>string[])): string[] {
if (typeof v === "function") {
v = v(target);
}
if (typeof v === "string") {
v = v.split(/\s+/);
}
return v;
}
constructor(cmd: ValidCmd) {
this.cmd = cmd;
}
toShell(target: TaskTarget) {
const parsedCmd = TaskTargetCmd.parse(target, this.cmd);
const out = parsedCmd
.map(c => {
let sh = c.replace(/\n/g, "")
return quote(sh);
});
return out.join(" ");
}
clone() {
return new TaskTargetCmd(this.cmd);
}
}
type ValidId = string | ((t: TaskTarget)=>string);
export class TaskTarget {
path: string;
fsImpl: FSImpl = defaultFSImpl;
pipeline: TaskTargetOp[];
idValue: ValidId | undefined;
postFns: ((t: TaskTarget)=>Promise<void>)[];
constructor(path: string){
this.path = path;
this.pipeline = [];
this.postFns = [];
}
exists() {
return this.fsImpl.existsSync(this.path);
}
_joinPath(path: string) {
let finalPath = path;
if (!path.startsWith('/')) {
finalPath = nodePath.join(this.path, path)
}
return finalPath;
}
get basename() {
return safe(nodePath.basename(this.path));
}
basenameN(n: number) {
return this.path
.split("/")
.map(s => safe(s))
.slice(-n)
.join("___");
}
get id() {
assert(this.idValue, `TaskTarget for path "${this.path}" must have an id`);
if (typeof this.idValue === "function") {
return safe(this.idValue(this));
}
return safe(this.idValue);
}
/**Changes the current directory of the target*/
cd(path: string): TaskTarget {
this.path = this._joinPath(path);
return this;
}
/**Unzips the file pointed to by the current TaskTarget*/
async unzip(): Promise<TaskTarget> {
const zfs = new ZipFS(this.path);
await zfs.init();
this.path = ""; // target is now rooted at the base of its respective zipfs
this.fsImpl = zfs.getImpl() as any;
return this;
}
/**Get a glob off of the target*/
glob(globPath: string): TaskTarget[] {
globPath = this._joinPath(globPath);
const items = globSync(globPath, {
cwd: '/DUMMYCWD',
fs: this.fsImpl
});
const ret = items.map(i => new TaskTarget(i));
// TODO: This should probably clone()
ret.forEach(t => t.fsImpl = this.fsImpl); // Should all use the same fsImpl
return ret;
}
/**Clones the TaskTarget*/
clone(): TaskTarget {
const t = new TaskTarget(this.path);
t.fsImpl = this.fsImpl; // holds no state, just needs same impl
t.idValue = this.idValue;
t.postFns = this.postFns.slice();
t.pipeline = this.pipeline.slice()
.map(p => p.clone());
return t;
}
pushToPipeline(v: TaskTargetOp) {
if (v.type === "read") {
assert(this.pipeline.length === 0, "A read can only be the first item in a pipeline");
}
this.pipeline.push(v);
}
toShell() {
const shell = this.pipeline
.map(p => p.toShell(this))
.join(" | ")
return shell;
}
pushPostFn(fn: ((t: TaskTarget)=>Promise<void>)) {
this.postFns.push(fn);
}
cmd(cmd: ValidCmd) {
this.pushToPipeline(new TaskTargetCmd(cmd));
return this;
}
read() {
this.pushToPipeline(new TaskTargetRead());
return this;
}
setId(idValue: ValidId) {
this.idValue = idValue;
return this;
}
post(fn: any) {
this.pushPostFn(fn);
}
types(
types: string[]
) {
// TODO:
return this;
}
csvSink(
summarization?: [string, string][]
) {
// TODO:
return this;
// Ingest this csv into the database at the given id
// this.cmd(t=>["sqlite-utils", "insert", "your.db", t.id, "-", "--csv", "--detect-types"]);
// Add a post processing function for these targets that prints out the summarization
// stats
// this.post(async (t: TaskTarget)=>{
// // We only do the first one so far for the summarization
// let queryLine: string;
// let formatFn: (r: any)=>string;
// const [columnName, type] = summarization?.[0] ?? [undefined, undefined];
// if (type === "numeric") {
// queryLine = `min(${columnName}) as lo, max(${columnName}) as hi, count(*) as n`;
// formatFn = (r: any)=>`${r.n} rows from ${r.lo} to ${r.hi} for ${t.id}`;
// }
// else {
// queryLine = `count(*) as n`;
// formatFn = (r: any)=>`${r.n} rows for ${t.id}`;
// }
// const cmd = "sqlite-utils";
// const args = ["query", "your.db", `select ${queryLine} from ${t.id}`]
// const { stdout, stderr } = await execFile(cmd, args);
// const results = JSON.parse(stdout);
// const result = results[0]; // should only be one result in the array for this type of query
// const logLine = formatFn(result);
// (t as any).log = logLine;
// });
// return this;
}
}
export function each(targets: TaskTarget[], fn: (t: TaskTarget)=>void) {
for (const t of targets) {
fn(t);
}
}
export function map(targets: TaskTarget[], fn: (t: TaskTarget)=>TaskTarget) {
const newTargets = [];
for (const t of targets) {
newTargets.push(fn(t));
}
return newTargets;
}
export function cd(targets: TaskTarget[], path: string): TaskTarget[] {
return targets.map(t => t.clone().cd(path));
}
export function glob(targets: TaskTarget[], globPath: string): TaskTarget[] {
return targets.map(t => t.glob(globPath)).flat();
}
export async function unzip(targets: TaskTarget[]): Promise<TaskTarget[]> {
return Promise.all(targets.map(t => t.unzip()));
}
export function read(targets: TaskTarget[]): TaskTarget[] {
return targets.map(t => t.clone().read())
}
export function cmd(targets: TaskTarget[], cmd: ValidCmd): TaskTarget[] {
return targets.map(t => t.clone().cmd(cmd))
}
export function setId(targets: TaskTarget[], id: ValidId): TaskTarget[] {
return targets.map(t => t.clone().setId(id))
}
/**Verify, anything that fails is skipped and throws an error*/
export async function verify(targets: TaskTarget[]) {
const outTargets: TaskTarget[] = [];
for (const t of targets) {
// Make sure fsImpl is ready
if ("ready" in t.fsImpl && !t.fsImpl.ready && t.fsImpl.init) {
await t.fsImpl.init();
}
// TODO: Probably remove or assert as incorrect
if (t.pipeline.length <= 0) {
continue; // Tasks with empty pipelines are no-ops, remove
}
if (!t.exists()) {
console.warn(`Missing target ${t.path}`);
continue;
}
outTargets.push(t);
}
return outTargets;
}
/**Writes a manifest for parallel, a TSV where each record is an id + the shell to run
* @todo Enforce doing a verify before we output?
*/
export function getTSVManifest(targets: TaskTarget[]): string {
let out: string[] = [];
for (const t of targets) {
const shell = t.toShell();
out.push(`${t.id}\t${shell}`);
}
return out.join("\n");
}
export function getTaskManifest(targets: TaskTarget[]): [string, string][] {
let out: [string, string][] = [];
for (const t of targets) {
const shell = t.toShell();
out.push([t.id, shell] as const);
}
return out;
}
function collectionSwap(a: TaskTargetPipelineHelper, b: TaskTargetPipelineHelper) {
if (!a.__collection) {
return;
}
// Remove a, add b
const collection = a.__collection;
delete a.__collection;
collection.delete(a);
b.__collection = collection;
collection.add(b);
}
export class TaskTargetPipelineHelper extends Array<TaskTarget> {
__collection?: Set<TaskTargetPipelineHelper>;
static pipeline(t: TaskTarget[]): TaskTargetPipelineHelper {
if (Object.getPrototypeOf(t) === TaskTargetPipelineHelper.prototype) {
return t as any; // Already done
}
Object.setPrototypeOf(t, TaskTargetPipelineHelper.prototype);
return t as any;
}
_fn(fn: (t: TaskTarget[])=>TaskTarget[]): TaskTargetPipelineHelper {
const p = TaskTargetPipelineHelper.pipeline(this);
const t = fn(p);
const p2 = TaskTargetPipelineHelper.pipeline(t);
collectionSwap(p, p2); // Move collection pointer to the new item, ends always end up in the collection
return p2;
}
async _afn(fn: (t: TaskTarget[])=>Promise<TaskTarget[]>): Promise<TaskTargetPipelineHelper> {
const p = TaskTargetPipelineHelper.pipeline(this);
const t = await fn(p);
const p2 = TaskTargetPipelineHelper.pipeline(t);
collectionSwap(p, p2); // Move collection pointer to the new item, ends always end up in the collection
return p2;
}
cd(path: string): TaskTargetPipelineHelper {
return this._fn(t => cd(t, path));
}
glob(globPath: string): TaskTargetPipelineHelper {
return this._fn(t => glob(t, globPath));
}
async unzip(): Promise<TaskTargetPipelineHelper> {
return this._afn(unzip);
}
read(): TaskTargetPipelineHelper {
return this._fn(read);
}
cmd(_cmd: ValidCmd): TaskTargetPipelineHelper {
return this._fn(t => cmd(t, _cmd));
}
setId(id: ValidId): TaskTargetPipelineHelper {
return this._fn(t => setId(t, id));
}
types(...args: any[]) {
// TODO: no-op
return this;
}
csvSink(...args: any[]) {
// TODO: no-op
return this;
}
/**
* @todo Nested versions of this don't currently work, but they could if we
* turn __collection into an array of collections
*/
collect(_c: Set<TaskTargetPipelineHelper>) {
this.__collection = _c;
return this;
}
}
export async function run(target: TaskTarget): Promise<ProcessPromise> {
const command = target.toShell();
return await $({ nothrow: true })`bash -c ${command}`;
}