base-data-manager/data-export/parallel.ts
cobertos f6d0427a45 Converted TaskTargetPipelineHelper to more functional style, added aggregate() functionality to bring together multiple exports (no tests, but works)
* made parallel generic (not tied to TaskTarget)
* pulled common higher-order/frontend operations into io.ts
* split timelinize specific functionality into own file
* Tests made to pass and match previous facebook export snapshots _exactly_
2026-02-26 00:14:10 -05:00

95 lines
2.7 KiB
TypeScript

import os from 'os';
/**Generic parallel runner with optional logging
* Runs `targets` with `runFn` up to a maximum of `maxConcurrency` amount at a time
* Shaped in a way that expects generally something that returns zx.ProcessOutput (or
* something with .duration and .ok built-in to the return)
* @param runFn Should NOT throw. Return { ok: false } instead
*/
export async function parallel<T, R extends { duration: number, ok: boolean }>(
targets: T[],
runFn: (t: T)=>Promise<R>,
quiet: boolean = false,
maxConcurrency: number = os.cpus().length
): Promise<R[]> {
const resultMap = new Map<T, R>();
const total = targets.length;
let completed = 0;
let running = 0;
const completionTimes: number[] = [];
const startTime = Date.now();
const inFlight = new Set<Promise<void>>();
function formatEta(): string {
const left = total - completed;
const avgSeconds = completionTimes.length > 0
? completionTimes.reduce((a, b) => a + b, 0) / completionTimes.length / 1000
: 0;
const etaSeconds = Math.round(left * avgSeconds);
const pct = total > 0 ? Math.round((completed / total) * 100) : 100;
const lastDuration = completionTimes.length > 0
? (completionTimes[completionTimes.length - 1] / 1000).toFixed(1)
: '0.0';
return `ETA: ${etaSeconds}s Left: ${left} AVG: ${avgSeconds.toFixed(2)}s local:${running}/${completed}/${pct}%/${lastDuration}s`;
}
function printStatus(): void {
if (quiet) {
return;
}
process.stderr.write(`\r${formatEta()}`.padEnd(80));
}
async function runJob(t: T): Promise<void> {
running++;
printStatus();
const result = await runFn(t);
completionTimes.push(result.duration);
resultMap.set(t, result);
running--;
completed++;
printStatus();
}
const queue = targets.slice();
// Process queue with concurrency limit
while (queue.length > 0 || inFlight.size > 0) {
// Fill up to max concurrency
while (queue.length > 0 && inFlight.size < maxConcurrency) {
const target = queue.shift()!;
const promise = runJob(target).then(() => {
inFlight.delete(promise);
});
inFlight.add(promise);
}
// Wait for at least one to complete if at capacity
if (inFlight.size > 0) {
await Promise.race(inFlight);
}
}
// Final status line
process.stderr.write('\n');
const totalSeconds = ((Date.now() - startTime) / 1000).toFixed(1);
const failed = Array.from(resultMap.values().filter(p => !p.ok));
if (!quiet) {
process.stderr.write(
`\nCompleted ${total} jobs in ${totalSeconds}s (${failed.length} failed)\n`
);
}
const output = targets
.map(t => {
const r = resultMap.get(t)!;
return r;
});
return output;
}