diff --git a/api/index.js b/api/index.js index 3cad9e0..32a7869 100644 --- a/api/index.js +++ b/api/index.js @@ -4,9 +4,32 @@ import { cpus } from "os"; import { Worker } from "worker_threads"; import { join } from "path"; import { createServer } from "http"; -import ws from "ws"; +import { WebSocketServer } from "ws"; import { fileURLToPath } from "url"; import { dirname } from "path"; +import { createRequire } from "module"; + +const nodeRequire = createRequire(import.meta.url); +const magick = nodeRequire(`../build/${process.env.DEBUG && process.env.DEBUG === "true" ? "Debug" : "Release"}/image.node`); + +/* +Rerror 0x01 +Tqueue 0x02 +Rqueue 0x03 +Tcancel 0x04 +Rcancel 0x05 +Twait 0x06 +Rwait 0x07 +Rinit 0x08 +*/ +const Rerror = 0x01; +const Tqueue = 0x02; +const Rqueue = 0x03; +const Tcancel = 0x04; +const Rcancel = 0x05; +const Twait = 0x06; +const Rwait = 0x07; +const Rinit = 0x08; const start = process.hrtime(); const log = (msg, jobNum) => { @@ -23,30 +46,34 @@ class JobCache extends Map { } const jobs = new JobCache(); -// Should look like UUID : { msg: "request", num: } +// Should look like ID : { msg: "request", num: } const queue = []; -// Array of UUIDs - -import { v4 as uuidv4 } from "uuid"; +// Array of IDs const MAX_JOBS = process.env.JOBS !== "" && process.env.JOBS !== undefined ? parseInt(process.env.JOBS) : cpus().length * 4; // Completely arbitrary, should usually be some multiple of your amount of cores const PASS = process.env.PASS !== "" && process.env.PASS !== undefined ? process.env.PASS : undefined; let jobAmount = 0; -const acceptJob = (uuid, sock) => { +const acceptJob = (id, sock) => { jobAmount++; queue.shift(); - const job = jobs.get(uuid); + const job = jobs.get(id); return runJob({ - uuid: uuid, + id: id, msg: job.msg, num: job.num }, sock).then(() => { - log(`Job ${uuid} has finished`); + log(`Job ${id} has finished`); }).catch((err) => { - console.error(`Error on job ${uuid}:`, err); - jobs.delete(uuid); - sock.send(Buffer.concat([Buffer.from([0x2]), Buffer.from(uuid), Buffer.from(err.message)])); + console.error(`Error on job ${id}:`, err); + const newJob = jobs.get(id); + if (!newJob.tag) { + newJob.error = err.message; + jobs.set(id, newJob); + return; + } + jobs.delete(id); + sock.send(Buffer.concat([Buffer.from([Rerror]), newJob.tag, Buffer.from(err.message)])); }).finally(() => { jobAmount--; if (queue.length > 0) { @@ -55,42 +82,63 @@ const acceptJob = (uuid, sock) => { }); }; -const wss = new ws.Server({ clientTracking: true, noServer: true }); +const wss = new WebSocketServer({ clientTracking: true, noServer: true }); wss.on("connection", (ws, request) => { log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has connected`); + ws.on("open", () => { + const init = Buffer.concat([Buffer.from([Rinit]), Buffer.from(MAX_JOBS), Buffer.from(JSON.stringify(Object.keys(magick)))]); + ws.send(init); + }); + ws.on("error", (err) => { console.error(err); }); ws.on("message", (msg) => { const opcode = msg.readUint8(0); - const req = msg.toString().slice(1,msg.length); - console.log(req); - // 0x00 == Cancel job - // 0x01 == Queue job - if (opcode == 0x00) { - delete queue[queue.indexOf(req) - 1]; - jobs.delete(req); - } else if (opcode == 0x01) { - const length = parseInt(req.slice(0, 1)); - const num = req.slice(1, length + 1); - const obj = req.slice(length + 1); + const tag = msg.slice(1, 5); + const req = msg.toString().slice(5, msg.length); + if (opcode == Tqueue) { + const id = msg.readUInt32LE(5); + const obj = msg.slice(9, msg.length); const job = { msg: obj, num: jobAmount }; - const uuid = uuidv4(); - jobs.set(uuid, job); - queue.push(uuid); + jobs.set(id, job); + queue.push(id); - const newBuffer = Buffer.concat([Buffer.from([0x00]), Buffer.from(uuid), Buffer.from(num)]); + const newBuffer = Buffer.concat([Buffer.from([Rqueue]), tag]); ws.send(newBuffer); if (jobAmount < MAX_JOBS) { - log(`Got WS request for job ${job.msg} with id ${uuid}`, job.num); - acceptJob(uuid, ws); + log(`Got WS request for job ${job.msg} with id ${id}`, job.num); + acceptJob(id, ws); } else { - log(`Got WS request for job ${job.msg} with id ${uuid}, queued in position ${queue.indexOf(uuid)}`, job.num); + log(`Got WS request for job ${job.msg} with id ${id}, queued in position ${queue.indexOf(id)}`, job.num); } + } else if (opcode == Tcancel) { + delete queue[queue.indexOf(req) - 1]; + jobs.delete(req); + const cancelResponse = Buffer.concat([Buffer.from([Rcancel]), tag]); + ws.send(cancelResponse); + } else if (opcode == Twait) { + const id = msg.readUInt32LE(5); + const job = jobs.get(id); + if (!job) { + const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from("Invalid job ID")]); + ws.send(errorResponse); + return; + } + if (job.error) { + jobs.delete(id); + const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from(job.error)]); + ws.send(errorResponse); + return; + } + job.tag = tag; + jobs.set(id, job); + //const waitResponse = Buffer.concat([Buffer.from([Rwait]), tag]); + //ws.send(waitResponse); } else { log("Could not parse WS message"); } @@ -117,41 +165,35 @@ httpServer.on("request", async (req, res) => { return res.end("401 Unauthorized"); } const reqUrl = new URL(req.url, `http://${req.headers.host}`); - if (reqUrl.pathname === "/status" && req.method === "GET") { - log(`Sending server status to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`); - const statusObject = { - load: MAX_JOBS - jobAmount, - queued: queue.length - }; - return res.end(JSON.stringify(statusObject)); - } else if (reqUrl.pathname === "/running" && req.method === "GET") { - log(`Sending currently running jobs to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`); - const keys = jobs.keys(); - const newObject = { queued: queue.length, runningJobs: jobAmount, max: MAX_JOBS }; - for (const key of keys) { - const validKeys = Object.keys(jobs.get(key)).filter((value) => value !== "addr" && value !== "port" && value !== "data" && value !== "ext"); - newObject[key] = {}; - for (const validKey of validKeys) { - if (validKey === "msg") { - newObject[key][validKey] = JSON.parse(jobs.get(key)[validKey]); - } else { - newObject[key][validKey] = jobs.get(key)[validKey]; - } - } - } - return res.end(JSON.stringify(newObject)); - } else if (reqUrl.pathname === "/image" && req.method === "GET") { + if (reqUrl.pathname === "/image" && req.method === "GET") { if (!reqUrl.searchParams.has("id")) { res.statusCode = 400; return res.end("400 Bad Request"); } - const id = reqUrl.searchParams.get("id"); + const id = parseInt(reqUrl.searchParams.get("id")); if (!jobs.has(id)) { res.statusCode = 410; return res.end("410 Gone"); } log(`Sending image data for job ${id} to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`); - res.setHeader("ext", jobs.get(id).ext); + const ext = jobs.get(id).ext; + let contentType; + switch (ext) { + case "gif": + contentType = "image/gif"; + break; + case "png": + contentType = "image/png"; + break; + case "jpeg": + case "jpg": + contentType = "image/jpeg"; + break; + case "webp": + contentType = "image/webp"; + break; + } + res.setHeader("Content-Type", contentType); const data = jobs.get(id).data; jobs.delete(id); return res.end(data, (err) => { @@ -189,9 +231,9 @@ httpServer.listen(8080, () => { log("HTTP and WS listening on port 8080"); }); -const runJob = (job, sock) => { +const runJob = (job, ws) => { return new Promise((resolve, reject) => { - log(`Job ${job.uuid} starting...`, job.num); + log(`Job ${job.id} starting...`, job.num); const object = JSON.parse(job.msg); // If the image has a path, it must also have a type @@ -206,17 +248,16 @@ const runJob = (job, sock) => { worker.terminate(); reject(new Error("Job timed out")); }, 900000); - log(`Job ${job.uuid} started`, job.num); + log(`Job ${job.id} started`, job.num); worker.once("message", (data) => { clearTimeout(timeout); - log(`Sending result of job ${job.uuid} back to the bot`, job.num); - const jobObject = jobs.get(job.uuid); + log(`Sending result of job ${job.id} back to the bot`, job.num); + const jobObject = jobs.get(job.id); jobObject.data = data.buffer; jobObject.ext = data.fileExtension; - jobs.set(job.uuid, jobObject); - sock.send(Buffer.concat([Buffer.from([0x1]), Buffer.from(job.uuid)]), () => { - return resolve(); - }); + jobs.set(job.id, jobObject); + const waitResponse = Buffer.concat([Buffer.from([Rwait]), jobObject.tag]); + ws.send(waitResponse); }); worker.once("error", (e) => { clearTimeout(timeout); diff --git a/utils/imageConnection.js b/utils/imageConnection.js new file mode 100644 index 0000000..4181625 --- /dev/null +++ b/utils/imageConnection.js @@ -0,0 +1,163 @@ +import fetch from "node-fetch"; +import WebSocket from "ws"; +import * as logger from "./logger.js"; +import { setTimeout } from "timers/promises"; + +/* +Rerror 0x01 +Tqueue 0x02 +Rqueue 0x03 +Tcancel 0x04 +Rcancel 0x05 +Twait 0x06 +Rwait 0x07 +Rinit 0x08 +*/ +const Rerror = 0x01; +const Tqueue = 0x02; +//const Rqueue = 0x03; +const Tcancel = 0x04; +//const Rcancel = 0x05; +const Twait = 0x06; +//const Rwait = 0x07; +const Rinit = 0x08; + +class ImageConnection { + constructor(host, auth, tls = false) { + this.requests = new Map(); + this.host = host; + this.auth = auth; + this.tag = null; + this.disconnected = false; + this.njobs = 0; + this.max = 0; + this.formats = {}; + this.wsproto = null; + if (tls) { + this.wsproto = "wss"; + } else { + this.wsproto = "ws"; + } + this.sockurl = `${this.wsproto}://${host}/sock`; + this.conn = new WebSocket(this.sockurl, { + headers: { + "Authentication": auth && auth !== "" ? auth : undefined + } + }); + let httpproto; + if (tls) { + httpproto = "https"; + } else { + httpproto = "http"; + } + this.httpurl = `${httpproto}://${host}/image`; + this.conn.on("message", (msg) => this.onMessage(msg)); + this.conn.once("error", (err) => this.onError(err)); + this.conn.once("close", () => this.onClose()); + } + + onMessage(msg) { + const op = msg.readUint8(0); + if (op === Rinit) { + this.max = msg.readUint16LE(1); + this.formats = JSON.parse(msg.toString("utf8", 3)); + return; + } + const tag = msg.readUint32LE(1); + const promise = this.requests.get(tag); + this.requests.delete(tag); + if (op === Rerror) { + promise.reject(new Error(msg.slice(5, msg.length).toString())); + return; + } + promise.resolve(); + } + + onError(e) { + logger.error(e.toString()); + } + + async onClose() { + for (const promise of this.requests.values()) { + promise.reject(new Error("Request ended prematurely due to a closed connection")); + } + this.requests.clear(); + if (!this.disconnected) { + logger.warn(`Lost connection to ${this.host}, attempting to reconnect in 5 seconds...`); + await setTimeout(5000); + this.conn = new WebSocket(this.sockurl, { + headers: { + "Authentication": this.auth + } + }); + this.conn.on("message", (msg) => this.onMessage(msg)); + this.conn.once("error", (err) => this.onError(err)); + this.conn.once("close", () => this.onClose()); + } + this.disconnected = false; + } + + close() { + this.disconnected = true; + this.conn.close(); + } + + queue(jobid, jobobj) { + const str = JSON.stringify(jobobj); + const buf = Buffer.alloc(4 + str.length); + buf.writeUint32LE(jobid); + buf.write(str, 4); + return this.do(Tqueue, buf); + } + + wait(jobid) { + const buf = Buffer.alloc(4); + buf.writeUint32LE(jobid); + return this.do(Twait, buf); + } + + cancel(jobid) { + const buf = Buffer.alloc(4); + buf.writeUint32LE(jobid); + return this.do(Tcancel, buf); + } + + async getOutput(jobid) { + const req = await fetch(`${this.httpurl}?id=${jobid}`, { + headers: { + "Authentication": this.auth && this.auth !== "" ? this.auth : undefined + } + }); + const contentType = req.headers.get("Content-Type"); + let type; + switch (contentType) { + case "image/gif": + type = "gif"; + break; + case "image/png": + type = "png"; + break; + case "image/jpeg": + type = "jpg"; + break; + case "image/webp": + type = "webp"; + break; + } + return { buffer: Buffer.from(await req.arrayBuffer()), type }; + } + + async do(op, data) { + const buf = Buffer.alloc(1 + 4); + const tag = this.tag++; + buf.writeUint8(op); + buf.writeUint32LE(tag, 1); + this.conn.send(Buffer.concat([buf, data])); + const promise = new Promise((resolve, reject) => { + this.requests.set(tag, { resolve, reject }); + }); + return promise; + } +} + +export default ImageConnection; \ No newline at end of file diff --git a/utils/services/image.js b/utils/services/image.js index 6bcb24f..e2b9d7e 100644 --- a/utils/services/image.js +++ b/utils/services/image.js @@ -1,12 +1,11 @@ import { BaseServiceWorker } from "eris-fleet"; import * as logger from "../logger.js"; -import fetch from "node-fetch"; -import WebSocket from "ws"; import fs from "fs"; import path from "path"; import { fileURLToPath } from "url"; import { Worker } from "worker_threads"; -import { EventEmitter } from "events"; + +import ImageConnection from "../imageConnection.js"; class ImageWorker extends BaseServiceWorker { constructor(setup) { @@ -16,6 +15,7 @@ class ImageWorker extends BaseServiceWorker { this.jobs = {}; this.connections = new Map(); this.servers = JSON.parse(fs.readFileSync("./servers.json", { encoding: "utf8" })).image; + this.nextID = 0; } this.begin().then(() => this.serviceReady()); @@ -41,46 +41,18 @@ class ImageWorker extends BaseServiceWorker { } async getRunning() { - let serversLeft = this.connections.size; const statuses = []; - for (const address of this.connections.keys()) { - const connection = this.connections.get(address); - if (connection.readyState !== 0 && connection.readyState !== 1) { - serversLeft--; + for (const [address, connection] of this.connections) { + if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) { continue; } - const controller = new AbortController(); // eslint-disable-line no-undef - const timeout = setTimeout(() => { - controller.abort(); - }, 2000); - try { - const auth = this.servers.filter((val) => val.server === address)[0].auth; - const statusRequest = await fetch(`http://${address}:8080/running`, { - signal: controller.signal, - headers: { - "Authentication": auth && auth !== "" ? auth : undefined - } - }); - clearTimeout(timeout); - const status = await statusRequest.json(); - serversLeft--; - statuses.push(status); - } catch (e) { - if (e.name === "AbortError") { - serversLeft--; - continue; - } else if (e.code === "ECONNREFUSED") { - serversLeft--; - continue; - } - throw e; - } - } - if (!serversLeft) { - return statuses; - } else { - throw new Error("Loop ended before all servers could be checked"); + statuses.push({ + address, + runningJobs: connection.njobs, + max: connection.max + }); } + return statuses; } async chooseServer(ideal) { @@ -89,119 +61,27 @@ class ImageWorker extends BaseServiceWorker { return b.load - a.load; }).filter((e, i, array) => { return !(e.load < array[0].load); - }).sort((a, b) => { - return a.queued - b.queued; }); return sorted[0]; } async getIdeal() { - let serversLeft = this.connections.size; - if (serversLeft < this.servers.length) { - for (const server of this.servers) { - try { - if (!this.connections.has(server.server)) await this.connect(server.server, server.auth); - } catch (e) { - logger.error(e); - } - } - serversLeft = this.connections.size; - } const idealServers = []; - for (const address of this.connections.keys()) { - const connection = this.connections.get(address); - if (connection.readyState !== 0 && connection.readyState !== 1) { - serversLeft--; + for (const [address, connection] of this.connections) { + if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) { continue; } - const controller = new AbortController(); // eslint-disable-line no-undef - const timeout = setTimeout(() => { - controller.abort(); - }, 5000); - try { - const auth = this.servers.filter((val) => val.server === address)[0].auth; - const statusRequest = await fetch(`http://${address}:8080/status`, { - signal: controller.signal, - headers: { - "Authentication": auth && auth !== "" ? auth : undefined - } - }); - clearTimeout(timeout); - const status = await statusRequest.json(); - serversLeft--; - idealServers.push({ - addr: address, - load: status.load, - queued: status.queued - }); - } catch (e) { - if (e.name === "AbortError") { - serversLeft--; - continue; - } else if (e.code === "ECONNREFUSED") { - serversLeft--; - continue; - } - throw e; - } finally { - clearTimeout(timeout); - } - } - if (!serversLeft) { - const server = await this.chooseServer(idealServers); - return { addr: server.addr, sock: this.connections.get(server.addr) }; - } else { - throw new Error("Loop ended before all servers could be checked"); + idealServers.push({ + addr: address, + load: connection.njobs / connection.max + }); } + const server = await this.chooseServer(idealServers); + return this.connections.get(server.addr); } async connect(server, auth) { - const connection = new WebSocket(`ws://${server}:8080/sock`, { - headers: { - "Authentication": auth && auth !== "" ? auth : undefined - } - }); - connection.on("message", async (msg) => { - const opcode = msg.readUint8(0); - const req = msg.slice(37, msg.length); - const uuid = msg.slice(1, 37).toString(); - if (opcode === 0x00) { // Job queued - if (this.jobs[req]) { - this.jobs[req].event.emit("uuid", uuid); - } - } else if (opcode === 0x01) { // Job completed successfully - // the image API sends all job responses over the same socket; make sure this is ours - if (this.jobs[uuid]) { - const imageReq = await fetch(`http://${server}:8080/image?id=${uuid}`, { - headers: { - "Authentication": auth && auth !== "" ? auth : undefined - } - }); - const image = Buffer.from(await imageReq.arrayBuffer()); - // The response data is given as the file extension/ImageMagick type of the image (e.g. "png"), followed - // by a newline, followed by the image data. - - this.jobs[uuid].event.emit("image", image, imageReq.headers.get("ext")); - } - } else if (opcode === 0x02) { // Job errored - if (this.jobs[uuid]) { - this.jobs[uuid].event.emit("error", new Error(req)); - } - } - }); - connection.on("error", (e) => { - logger.error(e.toString()); - }); - connection.once("close", () => { - for (const uuid of Object.keys(this.jobs)) { - if (this.jobs[uuid].addr === server) { - this.jobs[uuid].event.emit("error", "Job ended prematurely due to a closed connection; please run your image job again"); - delete this.jobs[uuid]; - } - } - //logger.log(`Lost connection to ${server}, attempting to reconnect...`); - this.connections.delete(server); - }); + const connection = new ImageConnection(`${server}:8080`, auth); this.connections.set(server, connection); } @@ -209,70 +89,37 @@ class ImageWorker extends BaseServiceWorker { for (const connection of this.connections.values()) { connection.close(); } - for (const uuid of Object.keys(this.jobs)) { - this.jobs[uuid].event.emit("error", "Job ended prematurely (not really an error; just run your image job again)"); - delete this.jobs[uuid]; - } this.connections.clear(); return; } - async start(object, num) { - const currentServer = await this.getIdeal(); - const data = Buffer.concat([Buffer.from([0x01 /* queue job */]), Buffer.from(num.length.toString()), Buffer.from(num), Buffer.from(JSON.stringify(object))]); - currentServer.sock.send(data); - const event = new EventEmitter(); - this.jobs[num] = { event, addr: currentServer.addr }; - const uuid = await new Promise((resolve, reject) => { - event.once("uuid", (uuid) => resolve(uuid)); - event.once("error", reject); + waitForWorker(worker) { + return new Promise((resolve, reject) => { + worker.once("message", (data) => { + resolve({ + buffer: Buffer.from([...data.buffer]), + type: data.fileExtension + }); + }); + worker.once("error", reject); }); - delete this.jobs[num]; - this.jobs[uuid] = { event: event, addr: currentServer.addr }; - return { uuid: uuid, event: event }; } - run(object) { - return new Promise((resolve, reject) => { - if (process.env.API === "true") { - // Connect to best image server - const num = Math.floor(Math.random() * 100000).toString().slice(0, 5); - const timeout = setTimeout(() => { - if (this.jobs[num]) delete this.jobs[num]; - reject("The image request timed out after 25 seconds. Try uploading your image elsewhere."); - }, 25000); - this.start(object, num).then((data) => { - clearTimeout(timeout); - if (!data.event) reject("Not connected to image server"); - data.event.once("image", (image, type) => { - delete this.jobs[data.uuid]; - const payload = { - // Take just the image data - buffer: image, - type: type - }; - resolve(payload); - }); - data.event.once("error", (err) => { - delete this.jobs[data.uuid]; - reject(err); - }); - return; - }).catch(err => reject(err)); - } else { - // Called from command (not using image API) - const worker = new Worker(path.join(path.dirname(fileURLToPath(import.meta.url)), "../image-runner.js"), { - workerData: object - }); - worker.once("message", (data) => { - resolve({ - buffer: Buffer.from([...data.buffer]), - type: data.fileExtension - }); - }); - worker.once("error", reject); - } - }); + async run(object) { + if (process.env.API === "true") { + const num = this.nextID++; + const currentServer = await this.getIdeal(); + await currentServer.queue(num, object); + await currentServer.wait(num); + const output = await currentServer.getOutput(num); + return output; + } else { + // Called from command (not using image API) + const worker = new Worker(path.join(path.dirname(fileURLToPath(import.meta.url)), "../image-runner.js"), { + workerData: object + }); + return await this.waitForWorker(worker); + } } async handleCommand(data) {