base-data-manager/data-export/parallel.ts

86 lines
2.4 KiB
TypeScript

import { $, type ProcessOutput } from 'zx';
import os from 'os';
import { type TaskTarget, run } from "./task.ts";
$.verbose = false;
type ResultMap = Map<string, ProcessOutput>;
export async function parallel(
targets: TaskTarget[],
quiet: boolean = false,
maxConcurrency: number = os.cpus().length
): Promise<ResultMap> {
const results = new Map<string, ProcessOutput>();
const total = targets.length;
let completed = 0;
let running = 0;
const completionTimes: number[] = [];
const startTime = Date.now();
const inFlight = new Set<Promise<void>>();
function formatEta(): string {
const left = total - completed;
const avgSeconds = completionTimes.length > 0
? completionTimes.reduce((a, b) => a + b, 0) / completionTimes.length / 1000
: 0;
const etaSeconds = Math.round(left * avgSeconds);
const pct = total > 0 ? Math.round((completed / total) * 100) : 100;
const lastDuration = completionTimes.length > 0
? (completionTimes[completionTimes.length - 1] / 1000).toFixed(1)
: '0.0';
return `ETA: ${etaSeconds}s Left: ${left} AVG: ${avgSeconds.toFixed(2)}s local:${running}/${completed}/${pct}%/${lastDuration}s`;
}
function printStatus(): void {
if (quiet) {
return;
}
process.stderr.write(`\r${formatEta()}`.padEnd(80));
}
async function runJob(t: TaskTarget): Promise<void> {
running++;
printStatus();
const result = await run(t);
completionTimes.push(result.duration);
results.set(t.id, result);
running--;
completed++;
printStatus();
}
const queue = targets.slice();
// Process queue with concurrency limit
while (queue.length > 0 || inFlight.size > 0) {
// Fill up to max concurrency
while (queue.length > 0 && inFlight.size < maxConcurrency) {
const target = queue.shift()!;
const promise = runJob(target).then(() => {
inFlight.delete(promise);
});
inFlight.add(promise);
}
// Wait for at least one to complete if at capacity
if (inFlight.size > 0) {
await Promise.race(inFlight);
}
}
// Final status line
process.stderr.write('\n');
const totalSeconds = ((Date.now() - startTime) / 1000).toFixed(1);
const failed = Array.from(results.values().filter(p => !p.ok));
process.stderr.write(
`\nCompleted ${total} jobs in ${totalSeconds}s (${failed.length} failed)\n`
);
return results;
}