Attempt to handle api race condition

This commit is contained in:
Essem 2022-06-07 18:27:17 -05:00
parent b5ed2cc62a
commit a362b79ed4
No known key found for this signature in database
GPG key ID: 7D497397CC3A2A8C

View file

@ -8,20 +8,11 @@ import { WebSocketServer } from "ws";
import { fileURLToPath } from "url"; import { fileURLToPath } from "url";
import { dirname } from "path"; import { dirname } from "path";
import { createRequire } from "module"; import { createRequire } from "module";
import EventEmitter from "events";
const nodeRequire = createRequire(import.meta.url); const nodeRequire = createRequire(import.meta.url);
const magick = nodeRequire(`../build/${process.env.DEBUG && process.env.DEBUG === "true" ? "Debug" : "Release"}/image.node`); const magick = nodeRequire(`../build/${process.env.DEBUG && process.env.DEBUG === "true" ? "Debug" : "Release"}/image.node`);
/*
Rerror 0x01
Tqueue 0x02
Rqueue 0x03
Tcancel 0x04
Rcancel 0x05
Twait 0x06
Rwait 0x07
Rinit 0x08
*/
const Rerror = 0x01; const Rerror = 0x01;
const Tqueue = 0x02; const Tqueue = 0x02;
const Rqueue = 0x03; const Rqueue = 0x03;
@ -85,6 +76,13 @@ const acceptJob = (id, sock) => {
}); });
}; };
const waitForVerify = (event) => {
return new Promise((resolve, reject) => {
event.once("end", (r) => resolve(r));
event.once("error", (e) => reject(e));
});
};
const wss = new WebSocketServer({ clientTracking: true, noServer: true }); const wss = new WebSocketServer({ clientTracking: true, noServer: true });
wss.on("connection", (ws, request) => { wss.on("connection", (ws, request) => {
@ -111,7 +109,7 @@ wss.on("connection", (ws, request) => {
if (opcode == Tqueue) { if (opcode == Tqueue) {
const id = msg.readUInt32LE(3); const id = msg.readUInt32LE(3);
const obj = msg.slice(7); const obj = msg.slice(7);
const job = { msg: obj, num: jobAmount }; const job = { msg: obj, num: jobAmount, verifyEvent: new EventEmitter() };
jobs.set(id, job); jobs.set(id, job);
queue.push(id); queue.push(id);
@ -138,11 +136,13 @@ wss.on("connection", (ws, request) => {
return; return;
} }
if (job.error) { if (job.error) {
job.verifyEvent.emit("error", job.error);
jobs.delete(id); jobs.delete(id);
const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from(job.error)]); const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from(job.error)]);
ws.send(errorResponse); ws.send(errorResponse);
return; return;
} }
job.verifyEvent.emit("end", tag);
job.tag = tag; job.tag = tag;
jobs.set(id, job); jobs.set(id, job);
//const waitResponse = Buffer.concat([Buffer.from([Rwait]), tag]); //const waitResponse = Buffer.concat([Buffer.from([Rwait]), tag]);
@ -264,10 +264,18 @@ const runJob = (job, ws) => {
const jobObject = jobs.get(job.id); const jobObject = jobs.get(job.id);
jobObject.data = data.buffer; jobObject.data = data.buffer;
jobObject.ext = data.fileExtension; jobObject.ext = data.fileExtension;
jobs.set(job.id, jobObject); let verifyPromise;
const waitResponse = Buffer.concat([Buffer.from([Rwait]), jobObject.tag]); if (!jobObject.tag) {
ws.send(waitResponse); verifyPromise = waitForVerify(jobObject.verifyEvent);
resolve(); } else {
verifyPromise = Promise.resolve(jobObject.tag);
}
verifyPromise.then(tag => {
jobs.set(job.id, jobObject);
const waitResponse = Buffer.concat([Buffer.from([Rwait]), tag]);
ws.send(waitResponse);
resolve();
});
}); });
worker.once("error", (e) => { worker.once("error", (e) => {
clearTimeout(timeout); clearTimeout(timeout);