* made parallel generic (not tied to TaskTarget) * pulled common higher-order/frontend operations into io.ts * split timelinize specific functionality into own file * Tests made to pass and match previous facebook export snapshots _exactly_
95 lines
2.7 KiB
TypeScript
95 lines
2.7 KiB
TypeScript
import os from 'os';
|
|
|
|
/**Generic parallel runner with optional logging
|
|
* Runs `targets` with `runFn` up to a maximum of `maxConcurrency` amount at a time
|
|
* Shaped in a way that expects generally something that returns zx.ProcessOutput (or
|
|
* something with .duration and .ok built-in to the return)
|
|
* @param runFn Should NOT throw. Return { ok: false } instead
|
|
*/
|
|
export async function parallel<T, R extends { duration: number, ok: boolean }>(
|
|
targets: T[],
|
|
runFn: (t: T)=>Promise<R>,
|
|
quiet: boolean = false,
|
|
maxConcurrency: number = os.cpus().length
|
|
): Promise<R[]> {
|
|
const resultMap = new Map<T, R>();
|
|
|
|
const total = targets.length;
|
|
let completed = 0;
|
|
let running = 0;
|
|
const completionTimes: number[] = [];
|
|
const startTime = Date.now();
|
|
|
|
const inFlight = new Set<Promise<void>>();
|
|
|
|
function formatEta(): string {
|
|
const left = total - completed;
|
|
const avgSeconds = completionTimes.length > 0
|
|
? completionTimes.reduce((a, b) => a + b, 0) / completionTimes.length / 1000
|
|
: 0;
|
|
const etaSeconds = Math.round(left * avgSeconds);
|
|
const pct = total > 0 ? Math.round((completed / total) * 100) : 100;
|
|
const lastDuration = completionTimes.length > 0
|
|
? (completionTimes[completionTimes.length - 1] / 1000).toFixed(1)
|
|
: '0.0';
|
|
|
|
return `ETA: ${etaSeconds}s Left: ${left} AVG: ${avgSeconds.toFixed(2)}s local:${running}/${completed}/${pct}%/${lastDuration}s`;
|
|
}
|
|
|
|
function printStatus(): void {
|
|
if (quiet) {
|
|
return;
|
|
}
|
|
process.stderr.write(`\r${formatEta()}`.padEnd(80));
|
|
}
|
|
|
|
async function runJob(t: T): Promise<void> {
|
|
running++;
|
|
printStatus();
|
|
|
|
const result = await runFn(t);
|
|
completionTimes.push(result.duration);
|
|
|
|
resultMap.set(t, result);
|
|
|
|
running--;
|
|
completed++;
|
|
printStatus();
|
|
}
|
|
|
|
const queue = targets.slice();
|
|
// Process queue with concurrency limit
|
|
while (queue.length > 0 || inFlight.size > 0) {
|
|
// Fill up to max concurrency
|
|
while (queue.length > 0 && inFlight.size < maxConcurrency) {
|
|
const target = queue.shift()!;
|
|
const promise = runJob(target).then(() => {
|
|
inFlight.delete(promise);
|
|
});
|
|
inFlight.add(promise);
|
|
}
|
|
|
|
// Wait for at least one to complete if at capacity
|
|
if (inFlight.size > 0) {
|
|
await Promise.race(inFlight);
|
|
}
|
|
}
|
|
|
|
// Final status line
|
|
process.stderr.write('\n');
|
|
const totalSeconds = ((Date.now() - startTime) / 1000).toFixed(1);
|
|
const failed = Array.from(resultMap.values().filter(p => !p.ok));
|
|
if (!quiet) {
|
|
process.stderr.write(
|
|
`\nCompleted ${total} jobs in ${totalSeconds}s (${failed.length} failed)\n`
|
|
);
|
|
}
|
|
|
|
const output = targets
|
|
.map(t => {
|
|
const r = resultMap.get(t)!;
|
|
return r;
|
|
});
|
|
|
|
return output;
|
|
}
|