Fixed tag parameter stuff, added tag to Rinit, added format checking

This commit is contained in:
Essem 2021-11-29 15:27:13 -06:00
parent 19638966bb
commit 871979105c
No known key found for this signature in database
GPG key ID: 7D497397CC3A2A8C
5 changed files with 30 additions and 26 deletions

View file

@ -23,11 +23,11 @@ A client sends *requests* (T-messages) to a server, which subsequently *replies*
[s] means a string that goes until the end of the message. [s] means a string that goes until the end of the message.
[j] means JSON data that goes until the end of the message. [j] means JSON data that goes until the end of the message.
`tag` is used to identify a request/response pair, like `lock` in the original API. `jid` is used to identify a job. `job` is a job object. `tag` is used to identify a request/response pair, like `lock` in the original API. `jid` is used to identify a job. `job` is a job object.
- Rerror tag[4] error[s] - Rerror tag[2] error[s]
- Tqueue tag[4] jid[4] job[j] - Tqueue tag[2] jid[4] job[j]
- Rqueue tag[4] - Rqueue tag[2]
- Tcancel tag[4] jid[4] - Tcancel tag[2] jid[4]
- Rcancel tag[4] - Rcancel tag[2]
- Twait tag[4] jid[4] - Twait tag[2] jid[4]
- Rwait tag[4] - Rwait tag[2]
- Rinit max_jobs[2] formats[j] - Rinit tag[2] max_jobs[2] formats[j]

View file

@ -88,7 +88,11 @@ wss.on("connection", (ws, request) => {
log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has connected`); log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has connected`);
const num = Buffer.alloc(2); const num = Buffer.alloc(2);
num.writeUInt16LE(MAX_JOBS); num.writeUInt16LE(MAX_JOBS);
const init = Buffer.concat([Buffer.from([Rinit]), num, Buffer.from(JSON.stringify(Object.keys(magick)))]); const formats = {};
for (const cmd of Object.keys(magick)) {
formats[cmd] = ["image/png", "image/gif", "image/jpeg", "image/webp"];
}
const init = Buffer.concat([Buffer.from([Rinit]), Buffer.from([0x00, 0x00]), num, Buffer.from(JSON.stringify(formats))]);
ws.send(init); ws.send(init);
ws.on("error", (err) => { ws.on("error", (err) => {
@ -97,11 +101,11 @@ wss.on("connection", (ws, request) => {
ws.on("message", (msg) => { ws.on("message", (msg) => {
const opcode = msg.readUint8(0); const opcode = msg.readUint8(0);
const tag = msg.slice(1, 5); const tag = msg.slice(1, 3);
const req = msg.toString().slice(5, msg.length); const req = msg.toString().slice(3);
if (opcode == Tqueue) { if (opcode == Tqueue) {
const id = msg.readUInt32LE(5); const id = msg.readUInt32LE(3);
const obj = msg.slice(9, msg.length); const obj = msg.slice(7);
const job = { msg: obj, num: jobAmount }; const job = { msg: obj, num: jobAmount };
jobs.set(id, job); jobs.set(id, job);
queue.push(id); queue.push(id);
@ -121,7 +125,7 @@ wss.on("connection", (ws, request) => {
const cancelResponse = Buffer.concat([Buffer.from([Rcancel]), tag]); const cancelResponse = Buffer.concat([Buffer.from([Rcancel]), tag]);
ws.send(cancelResponse); ws.send(cancelResponse);
} else if (opcode == Twait) { } else if (opcode == Twait) {
const id = msg.readUInt32LE(5); const id = msg.readUInt32LE(3);
const job = jobs.get(id); const job = jobs.get(id);
if (!job) { if (!job) {
const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from("Invalid job ID")]); const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from("Invalid job ID")]);

View file

@ -2,7 +2,7 @@ import fetch from "node-fetch";
import fs from "fs"; import fs from "fs";
import fileType from "file-type"; import fileType from "file-type";
const formats = ["image/jpeg", "image/png", "image/webp", "image/gif", "video/mp4", "video/webm", "video/mov"]; const formats = ["image/jpeg", "image/png", "image/webp", "image/gif", "video/mp4", "video/webm", "video/quicktime"];
export const jobs = {}; export const jobs = {};

View file

@ -59,18 +59,18 @@ class ImageConnection {
onMessage(msg) { onMessage(msg) {
const op = msg.readUint8(0); const op = msg.readUint8(0);
if (op === Rinit) { if (op === Rinit) {
this.max = msg.readUint16LE(1); this.max = msg.readUint16LE(3);
this.formats = JSON.parse(msg.toString("utf8", 3)); this.formats = JSON.parse(msg.toString("utf8", 5));
return; return;
} }
const tag = msg.readUint32LE(1); const tag = msg.readUint16LE(1);
const promise = this.requests.get(tag); const promise = this.requests.get(tag);
this.requests.delete(tag); this.requests.delete(tag);
if (op === Rqueue) this.njobs++; if (op === Rqueue) this.njobs++;
if (op === Rcancel || op === Rwait) this.njobs--; if (op === Rcancel || op === Rwait) this.njobs--;
if (op === Rerror) { if (op === Rerror) {
this.njobs--; this.njobs--;
promise.reject(new Error(msg.slice(5, msg.length).toString())); promise.reject(new Error(msg.slice(3, msg.length).toString()));
return; return;
} }
promise.resolve(); promise.resolve();
@ -107,10 +107,9 @@ class ImageConnection {
queue(jobid, jobobj) { queue(jobid, jobobj) {
const str = JSON.stringify(jobobj); const str = JSON.stringify(jobobj);
const buf = Buffer.alloc(4 + str.length); const buf = Buffer.alloc(4);
buf.writeUint32LE(jobid); buf.writeUint32LE(jobid);
buf.write(str, 4); return this.do(Tqueue, Buffer.concat([buf, Buffer.from(str)]));
return this.do(Tqueue, buf);
} }
wait(jobid) { wait(jobid) {
@ -151,10 +150,10 @@ class ImageConnection {
} }
async do(op, data) { async do(op, data) {
const buf = Buffer.alloc(1 + 4); const buf = Buffer.alloc(1 + 2);
const tag = this.tag++; const tag = this.tag++;
buf.writeUint8(op); buf.writeUint8(op);
buf.writeUint32LE(tag, 1); buf.writeUint16LE(tag, 1);
this.conn.send(Buffer.concat([buf, data])); this.conn.send(Buffer.concat([buf, data]));
const promise = new Promise((resolve, reject) => { const promise = new Promise((resolve, reject) => {
this.requests.set(tag, { resolve, reject }); this.requests.set(tag, { resolve, reject });

View file

@ -65,12 +65,13 @@ class ImageWorker extends BaseServiceWorker {
return sorted[0]; return sorted[0];
} }
async getIdeal() { async getIdeal(object) {
const idealServers = []; const idealServers = [];
for (const [address, connection] of this.connections) { for (const [address, connection] of this.connections) {
if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) { if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) {
continue; continue;
} }
if (!connection.formats[object.cmd].includes(object.type)) continue;
idealServers.push({ idealServers.push({
addr: address, addr: address,
load: connection.njobs / connection.max load: connection.njobs / connection.max
@ -108,7 +109,7 @@ class ImageWorker extends BaseServiceWorker {
async run(object) { async run(object) {
if (process.env.API === "true") { if (process.env.API === "true") {
const num = this.nextID++; const num = this.nextID++;
const currentServer = await this.getIdeal(); const currentServer = await this.getIdeal(object);
await currentServer.queue(num, object); await currentServer.queue(num, object);
await currentServer.wait(num); await currentServer.wait(num);
const output = await currentServer.getOutput(num); const output = await currentServer.getOutput(num);