From 41068ae7638fd4dfd2bc56cb67b392616680265f Mon Sep 17 00:00:00 2001 From: Essem Date: Fri, 26 Nov 2021 23:05:09 -0600 Subject: [PATCH 1/8] Initial draft for new image API design --- api/index.js | 173 +++++++++++++++++----------- utils/imageConnection.js | 163 ++++++++++++++++++++++++++ utils/services/image.js | 241 +++++++-------------------------------- 3 files changed, 314 insertions(+), 263 deletions(-) create mode 100644 utils/imageConnection.js diff --git a/api/index.js b/api/index.js index 3cad9e0..32a7869 100644 --- a/api/index.js +++ b/api/index.js @@ -4,9 +4,32 @@ import { cpus } from "os"; import { Worker } from "worker_threads"; import { join } from "path"; import { createServer } from "http"; -import ws from "ws"; +import { WebSocketServer } from "ws"; import { fileURLToPath } from "url"; import { dirname } from "path"; +import { createRequire } from "module"; + +const nodeRequire = createRequire(import.meta.url); +const magick = nodeRequire(`../build/${process.env.DEBUG && process.env.DEBUG === "true" ? "Debug" : "Release"}/image.node`); + +/* +Rerror 0x01 +Tqueue 0x02 +Rqueue 0x03 +Tcancel 0x04 +Rcancel 0x05 +Twait 0x06 +Rwait 0x07 +Rinit 0x08 +*/ +const Rerror = 0x01; +const Tqueue = 0x02; +const Rqueue = 0x03; +const Tcancel = 0x04; +const Rcancel = 0x05; +const Twait = 0x06; +const Rwait = 0x07; +const Rinit = 0x08; const start = process.hrtime(); const log = (msg, jobNum) => { @@ -23,30 +46,34 @@ class JobCache extends Map { } const jobs = new JobCache(); -// Should look like UUID : { msg: "request", num: } +// Should look like ID : { msg: "request", num: } const queue = []; -// Array of UUIDs - -import { v4 as uuidv4 } from "uuid"; +// Array of IDs const MAX_JOBS = process.env.JOBS !== "" && process.env.JOBS !== undefined ? parseInt(process.env.JOBS) : cpus().length * 4; // Completely arbitrary, should usually be some multiple of your amount of cores const PASS = process.env.PASS !== "" && process.env.PASS !== undefined ? process.env.PASS : undefined; let jobAmount = 0; -const acceptJob = (uuid, sock) => { +const acceptJob = (id, sock) => { jobAmount++; queue.shift(); - const job = jobs.get(uuid); + const job = jobs.get(id); return runJob({ - uuid: uuid, + id: id, msg: job.msg, num: job.num }, sock).then(() => { - log(`Job ${uuid} has finished`); + log(`Job ${id} has finished`); }).catch((err) => { - console.error(`Error on job ${uuid}:`, err); - jobs.delete(uuid); - sock.send(Buffer.concat([Buffer.from([0x2]), Buffer.from(uuid), Buffer.from(err.message)])); + console.error(`Error on job ${id}:`, err); + const newJob = jobs.get(id); + if (!newJob.tag) { + newJob.error = err.message; + jobs.set(id, newJob); + return; + } + jobs.delete(id); + sock.send(Buffer.concat([Buffer.from([Rerror]), newJob.tag, Buffer.from(err.message)])); }).finally(() => { jobAmount--; if (queue.length > 0) { @@ -55,42 +82,63 @@ const acceptJob = (uuid, sock) => { }); }; -const wss = new ws.Server({ clientTracking: true, noServer: true }); +const wss = new WebSocketServer({ clientTracking: true, noServer: true }); wss.on("connection", (ws, request) => { log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has connected`); + ws.on("open", () => { + const init = Buffer.concat([Buffer.from([Rinit]), Buffer.from(MAX_JOBS), Buffer.from(JSON.stringify(Object.keys(magick)))]); + ws.send(init); + }); + ws.on("error", (err) => { console.error(err); }); ws.on("message", (msg) => { const opcode = msg.readUint8(0); - const req = msg.toString().slice(1,msg.length); - console.log(req); - // 0x00 == Cancel job - // 0x01 == Queue job - if (opcode == 0x00) { - delete queue[queue.indexOf(req) - 1]; - jobs.delete(req); - } else if (opcode == 0x01) { - const length = parseInt(req.slice(0, 1)); - const num = req.slice(1, length + 1); - const obj = req.slice(length + 1); + const tag = msg.slice(1, 5); + const req = msg.toString().slice(5, msg.length); + if (opcode == Tqueue) { + const id = msg.readUInt32LE(5); + const obj = msg.slice(9, msg.length); const job = { msg: obj, num: jobAmount }; - const uuid = uuidv4(); - jobs.set(uuid, job); - queue.push(uuid); + jobs.set(id, job); + queue.push(id); - const newBuffer = Buffer.concat([Buffer.from([0x00]), Buffer.from(uuid), Buffer.from(num)]); + const newBuffer = Buffer.concat([Buffer.from([Rqueue]), tag]); ws.send(newBuffer); if (jobAmount < MAX_JOBS) { - log(`Got WS request for job ${job.msg} with id ${uuid}`, job.num); - acceptJob(uuid, ws); + log(`Got WS request for job ${job.msg} with id ${id}`, job.num); + acceptJob(id, ws); } else { - log(`Got WS request for job ${job.msg} with id ${uuid}, queued in position ${queue.indexOf(uuid)}`, job.num); + log(`Got WS request for job ${job.msg} with id ${id}, queued in position ${queue.indexOf(id)}`, job.num); } + } else if (opcode == Tcancel) { + delete queue[queue.indexOf(req) - 1]; + jobs.delete(req); + const cancelResponse = Buffer.concat([Buffer.from([Rcancel]), tag]); + ws.send(cancelResponse); + } else if (opcode == Twait) { + const id = msg.readUInt32LE(5); + const job = jobs.get(id); + if (!job) { + const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from("Invalid job ID")]); + ws.send(errorResponse); + return; + } + if (job.error) { + jobs.delete(id); + const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from(job.error)]); + ws.send(errorResponse); + return; + } + job.tag = tag; + jobs.set(id, job); + //const waitResponse = Buffer.concat([Buffer.from([Rwait]), tag]); + //ws.send(waitResponse); } else { log("Could not parse WS message"); } @@ -117,41 +165,35 @@ httpServer.on("request", async (req, res) => { return res.end("401 Unauthorized"); } const reqUrl = new URL(req.url, `http://${req.headers.host}`); - if (reqUrl.pathname === "/status" && req.method === "GET") { - log(`Sending server status to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`); - const statusObject = { - load: MAX_JOBS - jobAmount, - queued: queue.length - }; - return res.end(JSON.stringify(statusObject)); - } else if (reqUrl.pathname === "/running" && req.method === "GET") { - log(`Sending currently running jobs to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`); - const keys = jobs.keys(); - const newObject = { queued: queue.length, runningJobs: jobAmount, max: MAX_JOBS }; - for (const key of keys) { - const validKeys = Object.keys(jobs.get(key)).filter((value) => value !== "addr" && value !== "port" && value !== "data" && value !== "ext"); - newObject[key] = {}; - for (const validKey of validKeys) { - if (validKey === "msg") { - newObject[key][validKey] = JSON.parse(jobs.get(key)[validKey]); - } else { - newObject[key][validKey] = jobs.get(key)[validKey]; - } - } - } - return res.end(JSON.stringify(newObject)); - } else if (reqUrl.pathname === "/image" && req.method === "GET") { + if (reqUrl.pathname === "/image" && req.method === "GET") { if (!reqUrl.searchParams.has("id")) { res.statusCode = 400; return res.end("400 Bad Request"); } - const id = reqUrl.searchParams.get("id"); + const id = parseInt(reqUrl.searchParams.get("id")); if (!jobs.has(id)) { res.statusCode = 410; return res.end("410 Gone"); } log(`Sending image data for job ${id} to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`); - res.setHeader("ext", jobs.get(id).ext); + const ext = jobs.get(id).ext; + let contentType; + switch (ext) { + case "gif": + contentType = "image/gif"; + break; + case "png": + contentType = "image/png"; + break; + case "jpeg": + case "jpg": + contentType = "image/jpeg"; + break; + case "webp": + contentType = "image/webp"; + break; + } + res.setHeader("Content-Type", contentType); const data = jobs.get(id).data; jobs.delete(id); return res.end(data, (err) => { @@ -189,9 +231,9 @@ httpServer.listen(8080, () => { log("HTTP and WS listening on port 8080"); }); -const runJob = (job, sock) => { +const runJob = (job, ws) => { return new Promise((resolve, reject) => { - log(`Job ${job.uuid} starting...`, job.num); + log(`Job ${job.id} starting...`, job.num); const object = JSON.parse(job.msg); // If the image has a path, it must also have a type @@ -206,17 +248,16 @@ const runJob = (job, sock) => { worker.terminate(); reject(new Error("Job timed out")); }, 900000); - log(`Job ${job.uuid} started`, job.num); + log(`Job ${job.id} started`, job.num); worker.once("message", (data) => { clearTimeout(timeout); - log(`Sending result of job ${job.uuid} back to the bot`, job.num); - const jobObject = jobs.get(job.uuid); + log(`Sending result of job ${job.id} back to the bot`, job.num); + const jobObject = jobs.get(job.id); jobObject.data = data.buffer; jobObject.ext = data.fileExtension; - jobs.set(job.uuid, jobObject); - sock.send(Buffer.concat([Buffer.from([0x1]), Buffer.from(job.uuid)]), () => { - return resolve(); - }); + jobs.set(job.id, jobObject); + const waitResponse = Buffer.concat([Buffer.from([Rwait]), jobObject.tag]); + ws.send(waitResponse); }); worker.once("error", (e) => { clearTimeout(timeout); diff --git a/utils/imageConnection.js b/utils/imageConnection.js new file mode 100644 index 0000000..4181625 --- /dev/null +++ b/utils/imageConnection.js @@ -0,0 +1,163 @@ +import fetch from "node-fetch"; +import WebSocket from "ws"; +import * as logger from "./logger.js"; +import { setTimeout } from "timers/promises"; + +/* +Rerror 0x01 +Tqueue 0x02 +Rqueue 0x03 +Tcancel 0x04 +Rcancel 0x05 +Twait 0x06 +Rwait 0x07 +Rinit 0x08 +*/ +const Rerror = 0x01; +const Tqueue = 0x02; +//const Rqueue = 0x03; +const Tcancel = 0x04; +//const Rcancel = 0x05; +const Twait = 0x06; +//const Rwait = 0x07; +const Rinit = 0x08; + +class ImageConnection { + constructor(host, auth, tls = false) { + this.requests = new Map(); + this.host = host; + this.auth = auth; + this.tag = null; + this.disconnected = false; + this.njobs = 0; + this.max = 0; + this.formats = {}; + this.wsproto = null; + if (tls) { + this.wsproto = "wss"; + } else { + this.wsproto = "ws"; + } + this.sockurl = `${this.wsproto}://${host}/sock`; + this.conn = new WebSocket(this.sockurl, { + headers: { + "Authentication": auth && auth !== "" ? auth : undefined + } + }); + let httpproto; + if (tls) { + httpproto = "https"; + } else { + httpproto = "http"; + } + this.httpurl = `${httpproto}://${host}/image`; + this.conn.on("message", (msg) => this.onMessage(msg)); + this.conn.once("error", (err) => this.onError(err)); + this.conn.once("close", () => this.onClose()); + } + + onMessage(msg) { + const op = msg.readUint8(0); + if (op === Rinit) { + this.max = msg.readUint16LE(1); + this.formats = JSON.parse(msg.toString("utf8", 3)); + return; + } + const tag = msg.readUint32LE(1); + const promise = this.requests.get(tag); + this.requests.delete(tag); + if (op === Rerror) { + promise.reject(new Error(msg.slice(5, msg.length).toString())); + return; + } + promise.resolve(); + } + + onError(e) { + logger.error(e.toString()); + } + + async onClose() { + for (const promise of this.requests.values()) { + promise.reject(new Error("Request ended prematurely due to a closed connection")); + } + this.requests.clear(); + if (!this.disconnected) { + logger.warn(`Lost connection to ${this.host}, attempting to reconnect in 5 seconds...`); + await setTimeout(5000); + this.conn = new WebSocket(this.sockurl, { + headers: { + "Authentication": this.auth + } + }); + this.conn.on("message", (msg) => this.onMessage(msg)); + this.conn.once("error", (err) => this.onError(err)); + this.conn.once("close", () => this.onClose()); + } + this.disconnected = false; + } + + close() { + this.disconnected = true; + this.conn.close(); + } + + queue(jobid, jobobj) { + const str = JSON.stringify(jobobj); + const buf = Buffer.alloc(4 + str.length); + buf.writeUint32LE(jobid); + buf.write(str, 4); + return this.do(Tqueue, buf); + } + + wait(jobid) { + const buf = Buffer.alloc(4); + buf.writeUint32LE(jobid); + return this.do(Twait, buf); + } + + cancel(jobid) { + const buf = Buffer.alloc(4); + buf.writeUint32LE(jobid); + return this.do(Tcancel, buf); + } + + async getOutput(jobid) { + const req = await fetch(`${this.httpurl}?id=${jobid}`, { + headers: { + "Authentication": this.auth && this.auth !== "" ? this.auth : undefined + } + }); + const contentType = req.headers.get("Content-Type"); + let type; + switch (contentType) { + case "image/gif": + type = "gif"; + break; + case "image/png": + type = "png"; + break; + case "image/jpeg": + type = "jpg"; + break; + case "image/webp": + type = "webp"; + break; + } + return { buffer: Buffer.from(await req.arrayBuffer()), type }; + } + + async do(op, data) { + const buf = Buffer.alloc(1 + 4); + const tag = this.tag++; + buf.writeUint8(op); + buf.writeUint32LE(tag, 1); + this.conn.send(Buffer.concat([buf, data])); + const promise = new Promise((resolve, reject) => { + this.requests.set(tag, { resolve, reject }); + }); + return promise; + } +} + +export default ImageConnection; \ No newline at end of file diff --git a/utils/services/image.js b/utils/services/image.js index 6bcb24f..e2b9d7e 100644 --- a/utils/services/image.js +++ b/utils/services/image.js @@ -1,12 +1,11 @@ import { BaseServiceWorker } from "eris-fleet"; import * as logger from "../logger.js"; -import fetch from "node-fetch"; -import WebSocket from "ws"; import fs from "fs"; import path from "path"; import { fileURLToPath } from "url"; import { Worker } from "worker_threads"; -import { EventEmitter } from "events"; + +import ImageConnection from "../imageConnection.js"; class ImageWorker extends BaseServiceWorker { constructor(setup) { @@ -16,6 +15,7 @@ class ImageWorker extends BaseServiceWorker { this.jobs = {}; this.connections = new Map(); this.servers = JSON.parse(fs.readFileSync("./servers.json", { encoding: "utf8" })).image; + this.nextID = 0; } this.begin().then(() => this.serviceReady()); @@ -41,46 +41,18 @@ class ImageWorker extends BaseServiceWorker { } async getRunning() { - let serversLeft = this.connections.size; const statuses = []; - for (const address of this.connections.keys()) { - const connection = this.connections.get(address); - if (connection.readyState !== 0 && connection.readyState !== 1) { - serversLeft--; + for (const [address, connection] of this.connections) { + if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) { continue; } - const controller = new AbortController(); // eslint-disable-line no-undef - const timeout = setTimeout(() => { - controller.abort(); - }, 2000); - try { - const auth = this.servers.filter((val) => val.server === address)[0].auth; - const statusRequest = await fetch(`http://${address}:8080/running`, { - signal: controller.signal, - headers: { - "Authentication": auth && auth !== "" ? auth : undefined - } - }); - clearTimeout(timeout); - const status = await statusRequest.json(); - serversLeft--; - statuses.push(status); - } catch (e) { - if (e.name === "AbortError") { - serversLeft--; - continue; - } else if (e.code === "ECONNREFUSED") { - serversLeft--; - continue; - } - throw e; - } - } - if (!serversLeft) { - return statuses; - } else { - throw new Error("Loop ended before all servers could be checked"); + statuses.push({ + address, + runningJobs: connection.njobs, + max: connection.max + }); } + return statuses; } async chooseServer(ideal) { @@ -89,119 +61,27 @@ class ImageWorker extends BaseServiceWorker { return b.load - a.load; }).filter((e, i, array) => { return !(e.load < array[0].load); - }).sort((a, b) => { - return a.queued - b.queued; }); return sorted[0]; } async getIdeal() { - let serversLeft = this.connections.size; - if (serversLeft < this.servers.length) { - for (const server of this.servers) { - try { - if (!this.connections.has(server.server)) await this.connect(server.server, server.auth); - } catch (e) { - logger.error(e); - } - } - serversLeft = this.connections.size; - } const idealServers = []; - for (const address of this.connections.keys()) { - const connection = this.connections.get(address); - if (connection.readyState !== 0 && connection.readyState !== 1) { - serversLeft--; + for (const [address, connection] of this.connections) { + if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) { continue; } - const controller = new AbortController(); // eslint-disable-line no-undef - const timeout = setTimeout(() => { - controller.abort(); - }, 5000); - try { - const auth = this.servers.filter((val) => val.server === address)[0].auth; - const statusRequest = await fetch(`http://${address}:8080/status`, { - signal: controller.signal, - headers: { - "Authentication": auth && auth !== "" ? auth : undefined - } - }); - clearTimeout(timeout); - const status = await statusRequest.json(); - serversLeft--; - idealServers.push({ - addr: address, - load: status.load, - queued: status.queued - }); - } catch (e) { - if (e.name === "AbortError") { - serversLeft--; - continue; - } else if (e.code === "ECONNREFUSED") { - serversLeft--; - continue; - } - throw e; - } finally { - clearTimeout(timeout); - } - } - if (!serversLeft) { - const server = await this.chooseServer(idealServers); - return { addr: server.addr, sock: this.connections.get(server.addr) }; - } else { - throw new Error("Loop ended before all servers could be checked"); + idealServers.push({ + addr: address, + load: connection.njobs / connection.max + }); } + const server = await this.chooseServer(idealServers); + return this.connections.get(server.addr); } async connect(server, auth) { - const connection = new WebSocket(`ws://${server}:8080/sock`, { - headers: { - "Authentication": auth && auth !== "" ? auth : undefined - } - }); - connection.on("message", async (msg) => { - const opcode = msg.readUint8(0); - const req = msg.slice(37, msg.length); - const uuid = msg.slice(1, 37).toString(); - if (opcode === 0x00) { // Job queued - if (this.jobs[req]) { - this.jobs[req].event.emit("uuid", uuid); - } - } else if (opcode === 0x01) { // Job completed successfully - // the image API sends all job responses over the same socket; make sure this is ours - if (this.jobs[uuid]) { - const imageReq = await fetch(`http://${server}:8080/image?id=${uuid}`, { - headers: { - "Authentication": auth && auth !== "" ? auth : undefined - } - }); - const image = Buffer.from(await imageReq.arrayBuffer()); - // The response data is given as the file extension/ImageMagick type of the image (e.g. "png"), followed - // by a newline, followed by the image data. - - this.jobs[uuid].event.emit("image", image, imageReq.headers.get("ext")); - } - } else if (opcode === 0x02) { // Job errored - if (this.jobs[uuid]) { - this.jobs[uuid].event.emit("error", new Error(req)); - } - } - }); - connection.on("error", (e) => { - logger.error(e.toString()); - }); - connection.once("close", () => { - for (const uuid of Object.keys(this.jobs)) { - if (this.jobs[uuid].addr === server) { - this.jobs[uuid].event.emit("error", "Job ended prematurely due to a closed connection; please run your image job again"); - delete this.jobs[uuid]; - } - } - //logger.log(`Lost connection to ${server}, attempting to reconnect...`); - this.connections.delete(server); - }); + const connection = new ImageConnection(`${server}:8080`, auth); this.connections.set(server, connection); } @@ -209,70 +89,37 @@ class ImageWorker extends BaseServiceWorker { for (const connection of this.connections.values()) { connection.close(); } - for (const uuid of Object.keys(this.jobs)) { - this.jobs[uuid].event.emit("error", "Job ended prematurely (not really an error; just run your image job again)"); - delete this.jobs[uuid]; - } this.connections.clear(); return; } - async start(object, num) { - const currentServer = await this.getIdeal(); - const data = Buffer.concat([Buffer.from([0x01 /* queue job */]), Buffer.from(num.length.toString()), Buffer.from(num), Buffer.from(JSON.stringify(object))]); - currentServer.sock.send(data); - const event = new EventEmitter(); - this.jobs[num] = { event, addr: currentServer.addr }; - const uuid = await new Promise((resolve, reject) => { - event.once("uuid", (uuid) => resolve(uuid)); - event.once("error", reject); + waitForWorker(worker) { + return new Promise((resolve, reject) => { + worker.once("message", (data) => { + resolve({ + buffer: Buffer.from([...data.buffer]), + type: data.fileExtension + }); + }); + worker.once("error", reject); }); - delete this.jobs[num]; - this.jobs[uuid] = { event: event, addr: currentServer.addr }; - return { uuid: uuid, event: event }; } - run(object) { - return new Promise((resolve, reject) => { - if (process.env.API === "true") { - // Connect to best image server - const num = Math.floor(Math.random() * 100000).toString().slice(0, 5); - const timeout = setTimeout(() => { - if (this.jobs[num]) delete this.jobs[num]; - reject("The image request timed out after 25 seconds. Try uploading your image elsewhere."); - }, 25000); - this.start(object, num).then((data) => { - clearTimeout(timeout); - if (!data.event) reject("Not connected to image server"); - data.event.once("image", (image, type) => { - delete this.jobs[data.uuid]; - const payload = { - // Take just the image data - buffer: image, - type: type - }; - resolve(payload); - }); - data.event.once("error", (err) => { - delete this.jobs[data.uuid]; - reject(err); - }); - return; - }).catch(err => reject(err)); - } else { - // Called from command (not using image API) - const worker = new Worker(path.join(path.dirname(fileURLToPath(import.meta.url)), "../image-runner.js"), { - workerData: object - }); - worker.once("message", (data) => { - resolve({ - buffer: Buffer.from([...data.buffer]), - type: data.fileExtension - }); - }); - worker.once("error", reject); - } - }); + async run(object) { + if (process.env.API === "true") { + const num = this.nextID++; + const currentServer = await this.getIdeal(); + await currentServer.queue(num, object); + await currentServer.wait(num); + const output = await currentServer.getOutput(num); + return output; + } else { + // Called from command (not using image API) + const worker = new Worker(path.join(path.dirname(fileURLToPath(import.meta.url)), "../image-runner.js"), { + workerData: object + }); + return await this.waitForWorker(worker); + } } async handleCommand(data) { From fe321e00b49977cd4154338d5e4e6bbdc06edf03 Mon Sep 17 00:00:00 2001 From: Essem Date: Fri, 26 Nov 2021 23:24:13 -0600 Subject: [PATCH 2/8] Some fixes --- api/index.js | 9 ++++----- commands/general/imagestats.js | 2 +- utils/imageConnection.js | 9 ++++++--- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/api/index.js b/api/index.js index 32a7869..8602063 100644 --- a/api/index.js +++ b/api/index.js @@ -86,11 +86,10 @@ const wss = new WebSocketServer({ clientTracking: true, noServer: true }); wss.on("connection", (ws, request) => { log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has connected`); - - ws.on("open", () => { - const init = Buffer.concat([Buffer.from([Rinit]), Buffer.from(MAX_JOBS), Buffer.from(JSON.stringify(Object.keys(magick)))]); - ws.send(init); - }); + const num = Buffer.alloc(2); + num.writeUInt16LE(MAX_JOBS); + const init = Buffer.concat([Buffer.from([Rinit]), num, Buffer.from(JSON.stringify(Object.keys(magick)))]); + ws.send(init); ws.on("error", (err) => { console.error(err); diff --git a/commands/general/imagestats.js b/commands/general/imagestats.js index eace592..d2cbd32 100644 --- a/commands/general/imagestats.js +++ b/commands/general/imagestats.js @@ -18,7 +18,7 @@ class ImageStatsCommand extends Command { for (let i = 0; i < servers.length; i++) { embed.embeds[0].fields.push({ name: `Server ${i + 1}`, - value: `Running Jobs: ${servers[i].runningJobs}\nQueued: ${servers[i].queued}\nMax Jobs: ${servers[i].max}` + value: `Running Jobs: ${servers[i].runningJobs}\nQueued: ${Math.max(0, servers[i].runningJobs - servers[i].max)}\nMax Jobs: ${servers[i].max}` }); } return embed; diff --git a/utils/imageConnection.js b/utils/imageConnection.js index 4181625..04e3e4c 100644 --- a/utils/imageConnection.js +++ b/utils/imageConnection.js @@ -15,11 +15,11 @@ Rinit 0x08 */ const Rerror = 0x01; const Tqueue = 0x02; -//const Rqueue = 0x03; +const Rqueue = 0x03; const Tcancel = 0x04; -//const Rcancel = 0x05; +const Rcancel = 0x05; const Twait = 0x06; -//const Rwait = 0x07; +const Rwait = 0x07; const Rinit = 0x08; class ImageConnection { @@ -66,7 +66,10 @@ class ImageConnection { const tag = msg.readUint32LE(1); const promise = this.requests.get(tag); this.requests.delete(tag); + if (op === Rqueue) this.njobs++; + if (op === Rcancel || op === Rwait) this.njobs--; if (op === Rerror) { + this.njobs--; promise.reject(new Error(msg.slice(5, msg.length).toString())); return; } From 41992769efd5952031d06529e75c0a85de57f136 Mon Sep 17 00:00:00 2001 From: Essem Date: Sat, 27 Nov 2021 16:45:33 -0600 Subject: [PATCH 3/8] god dammit --- api/index.js | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/api/index.js b/api/index.js index 8602063..e9193f5 100644 --- a/api/index.js +++ b/api/index.js @@ -257,23 +257,11 @@ const runJob = (job, ws) => { jobs.set(job.id, jobObject); const waitResponse = Buffer.concat([Buffer.from([Rwait]), jobObject.tag]); ws.send(waitResponse); + resolve(); }); worker.once("error", (e) => { clearTimeout(timeout); reject(e); }); - /*run(object).then((data) => { - log(`Sending result of job ${job.uuid} back to the bot`, job.num); - const jobObject = jobs.get(job.uuid); - jobObject.data = data.buffer; - jobObject.ext = data.fileExtension; - jobs.set(job.uuid, jobObject); - sock.write(Buffer.concat([Buffer.from([0x1]), Buffer.from(job.uuid)]), (e) => { - if (e) return reject(e); - return resolve(); - }); - }).catch(e => { - reject(e); - });*/ }); }; \ No newline at end of file From 19638966bb9dcf2cb7181f7055ad93eb4d2b395f Mon Sep 17 00:00:00 2001 From: Essem Date: Sat, 27 Nov 2021 17:13:12 -0600 Subject: [PATCH 4/8] Add docs for API --- api/IMPLEMENTATION.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 api/IMPLEMENTATION.md diff --git a/api/IMPLEMENTATION.md b/api/IMPLEMENTATION.md new file mode 100644 index 0000000..c039bd6 --- /dev/null +++ b/api/IMPLEMENTATION.md @@ -0,0 +1,33 @@ +# esmBot Image API +The esmBot image API is a combined HTTP and WebSocket API running on port 8080. The API supports very basic authentication, which is defined on the server via the PASS environment variable and is sent from the client via the Authentication header in both HTTP and WS requests. + +## HTTP + +### GET `/image/?id=` +Get image data after job is finished running. The Content-Type header is properly set. + +## WebSockets +A client sends *requests* (T-messages) to a server, which subsequently *replies* (R-messages) to the client. +### Message IDs +- Rerror 0x01 +- Tqueue 0x02 +- Rqueue 0x03 +- Tcancel 0x04 +- Rcancel 0x05 +- Twait 0x06 +- Rwait 0x07 +- Rinit 0x08 + +### Messages +[n] means n bytes. +[s] means a string that goes until the end of the message. +[j] means JSON data that goes until the end of the message. +`tag` is used to identify a request/response pair, like `lock` in the original API. `jid` is used to identify a job. `job` is a job object. +- Rerror tag[4] error[s] +- Tqueue tag[4] jid[4] job[j] +- Rqueue tag[4] +- Tcancel tag[4] jid[4] +- Rcancel tag[4] +- Twait tag[4] jid[4] +- Rwait tag[4] +- Rinit max_jobs[2] formats[j] From 871979105cf9f349db3e468b2c897889364d2a1b Mon Sep 17 00:00:00 2001 From: Essem Date: Mon, 29 Nov 2021 15:27:13 -0600 Subject: [PATCH 5/8] Fixed tag parameter stuff, added tag to Rinit, added format checking --- api/IMPLEMENTATION.md | 16 ++++++++-------- api/index.js | 16 ++++++++++------ utils/image.js | 2 +- utils/imageConnection.js | 17 ++++++++--------- utils/services/image.js | 5 +++-- 5 files changed, 30 insertions(+), 26 deletions(-) diff --git a/api/IMPLEMENTATION.md b/api/IMPLEMENTATION.md index c039bd6..36ab559 100644 --- a/api/IMPLEMENTATION.md +++ b/api/IMPLEMENTATION.md @@ -23,11 +23,11 @@ A client sends *requests* (T-messages) to a server, which subsequently *replies* [s] means a string that goes until the end of the message. [j] means JSON data that goes until the end of the message. `tag` is used to identify a request/response pair, like `lock` in the original API. `jid` is used to identify a job. `job` is a job object. -- Rerror tag[4] error[s] -- Tqueue tag[4] jid[4] job[j] -- Rqueue tag[4] -- Tcancel tag[4] jid[4] -- Rcancel tag[4] -- Twait tag[4] jid[4] -- Rwait tag[4] -- Rinit max_jobs[2] formats[j] +- Rerror tag[2] error[s] +- Tqueue tag[2] jid[4] job[j] +- Rqueue tag[2] +- Tcancel tag[2] jid[4] +- Rcancel tag[2] +- Twait tag[2] jid[4] +- Rwait tag[2] +- Rinit tag[2] max_jobs[2] formats[j] diff --git a/api/index.js b/api/index.js index e9193f5..f1ca5fb 100644 --- a/api/index.js +++ b/api/index.js @@ -88,7 +88,11 @@ wss.on("connection", (ws, request) => { log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has connected`); const num = Buffer.alloc(2); num.writeUInt16LE(MAX_JOBS); - const init = Buffer.concat([Buffer.from([Rinit]), num, Buffer.from(JSON.stringify(Object.keys(magick)))]); + const formats = {}; + for (const cmd of Object.keys(magick)) { + formats[cmd] = ["image/png", "image/gif", "image/jpeg", "image/webp"]; + } + const init = Buffer.concat([Buffer.from([Rinit]), Buffer.from([0x00, 0x00]), num, Buffer.from(JSON.stringify(formats))]); ws.send(init); ws.on("error", (err) => { @@ -97,11 +101,11 @@ wss.on("connection", (ws, request) => { ws.on("message", (msg) => { const opcode = msg.readUint8(0); - const tag = msg.slice(1, 5); - const req = msg.toString().slice(5, msg.length); + const tag = msg.slice(1, 3); + const req = msg.toString().slice(3); if (opcode == Tqueue) { - const id = msg.readUInt32LE(5); - const obj = msg.slice(9, msg.length); + const id = msg.readUInt32LE(3); + const obj = msg.slice(7); const job = { msg: obj, num: jobAmount }; jobs.set(id, job); queue.push(id); @@ -121,7 +125,7 @@ wss.on("connection", (ws, request) => { const cancelResponse = Buffer.concat([Buffer.from([Rcancel]), tag]); ws.send(cancelResponse); } else if (opcode == Twait) { - const id = msg.readUInt32LE(5); + const id = msg.readUInt32LE(3); const job = jobs.get(id); if (!job) { const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from("Invalid job ID")]); diff --git a/utils/image.js b/utils/image.js index 6ab807a..1486b60 100644 --- a/utils/image.js +++ b/utils/image.js @@ -2,7 +2,7 @@ import fetch from "node-fetch"; import fs from "fs"; import fileType from "file-type"; -const formats = ["image/jpeg", "image/png", "image/webp", "image/gif", "video/mp4", "video/webm", "video/mov"]; +const formats = ["image/jpeg", "image/png", "image/webp", "image/gif", "video/mp4", "video/webm", "video/quicktime"]; export const jobs = {}; diff --git a/utils/imageConnection.js b/utils/imageConnection.js index 04e3e4c..b1b87b4 100644 --- a/utils/imageConnection.js +++ b/utils/imageConnection.js @@ -59,18 +59,18 @@ class ImageConnection { onMessage(msg) { const op = msg.readUint8(0); if (op === Rinit) { - this.max = msg.readUint16LE(1); - this.formats = JSON.parse(msg.toString("utf8", 3)); + this.max = msg.readUint16LE(3); + this.formats = JSON.parse(msg.toString("utf8", 5)); return; } - const tag = msg.readUint32LE(1); + const tag = msg.readUint16LE(1); const promise = this.requests.get(tag); this.requests.delete(tag); if (op === Rqueue) this.njobs++; if (op === Rcancel || op === Rwait) this.njobs--; if (op === Rerror) { this.njobs--; - promise.reject(new Error(msg.slice(5, msg.length).toString())); + promise.reject(new Error(msg.slice(3, msg.length).toString())); return; } promise.resolve(); @@ -107,10 +107,9 @@ class ImageConnection { queue(jobid, jobobj) { const str = JSON.stringify(jobobj); - const buf = Buffer.alloc(4 + str.length); + const buf = Buffer.alloc(4); buf.writeUint32LE(jobid); - buf.write(str, 4); - return this.do(Tqueue, buf); + return this.do(Tqueue, Buffer.concat([buf, Buffer.from(str)])); } wait(jobid) { @@ -151,10 +150,10 @@ class ImageConnection { } async do(op, data) { - const buf = Buffer.alloc(1 + 4); + const buf = Buffer.alloc(1 + 2); const tag = this.tag++; buf.writeUint8(op); - buf.writeUint32LE(tag, 1); + buf.writeUint16LE(tag, 1); this.conn.send(Buffer.concat([buf, data])); const promise = new Promise((resolve, reject) => { this.requests.set(tag, { resolve, reject }); diff --git a/utils/services/image.js b/utils/services/image.js index e2b9d7e..c352998 100644 --- a/utils/services/image.js +++ b/utils/services/image.js @@ -65,12 +65,13 @@ class ImageWorker extends BaseServiceWorker { return sorted[0]; } - async getIdeal() { + async getIdeal(object) { const idealServers = []; for (const [address, connection] of this.connections) { if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) { continue; } + if (!connection.formats[object.cmd].includes(object.type)) continue; idealServers.push({ addr: address, load: connection.njobs / connection.max @@ -108,7 +109,7 @@ class ImageWorker extends BaseServiceWorker { async run(object) { if (process.env.API === "true") { const num = this.nextID++; - const currentServer = await this.getIdeal(); + const currentServer = await this.getIdeal(object); await currentServer.queue(num, object); await currentServer.wait(num); const output = await currentServer.getOutput(num); From 38e5a1d9f273932821ea5bbae2b3a8c11bd79c87 Mon Sep 17 00:00:00 2001 From: Essem Date: Thu, 2 Dec 2021 18:01:33 -0600 Subject: [PATCH 6/8] Change image API port, added TLS support, fixed content-type header --- Dockerfile.api | 2 +- api/IMPLEMENTATION.md | 2 +- api/index.js | 7 ++++--- servers.json | 2 +- utils/imageConnection.js | 7 ++++--- utils/services/image.js | 2 +- 6 files changed, 12 insertions(+), 10 deletions(-) diff --git a/Dockerfile.api b/Dockerfile.api index c1437f3..975ab59 100644 --- a/Dockerfile.api +++ b/Dockerfile.api @@ -55,6 +55,6 @@ RUN npm run build USER esmBot -EXPOSE 8080 8081 +EXPOSE 3762 ENTRYPOINT ["node", "api/index.js"] \ No newline at end of file diff --git a/api/IMPLEMENTATION.md b/api/IMPLEMENTATION.md index 36ab559..eab9674 100644 --- a/api/IMPLEMENTATION.md +++ b/api/IMPLEMENTATION.md @@ -1,5 +1,5 @@ # esmBot Image API -The esmBot image API is a combined HTTP and WebSocket API running on port 8080. The API supports very basic authentication, which is defined on the server via the PASS environment variable and is sent from the client via the Authentication header in both HTTP and WS requests. +The esmBot image API is a combined HTTP and WebSocket API running on port 3762. The API supports very basic authentication, which is defined on the server via the PASS environment variable and is sent from the client via the Authentication header in both HTTP and WS requests. ## HTTP diff --git a/api/index.js b/api/index.js index f1ca5fb..9cab7d3 100644 --- a/api/index.js +++ b/api/index.js @@ -196,7 +196,8 @@ httpServer.on("request", async (req, res) => { contentType = "image/webp"; break; } - res.setHeader("Content-Type", contentType); + if (contentType) res.setHeader("Content-Type", contentType); + else res.setHeader("Content-Type", ext); const data = jobs.get(id).data; jobs.delete(id); return res.end(data, (err) => { @@ -230,8 +231,8 @@ httpServer.on("error", (e) => { console.error("An HTTP error occurred: ", e); }); -httpServer.listen(8080, () => { - log("HTTP and WS listening on port 8080"); +httpServer.listen(3762, () => { + log("HTTP and WS listening on port 3762"); }); const runJob = (job, ws) => { diff --git a/servers.json b/servers.json index 8a2d11f..c138d49 100644 --- a/servers.json +++ b/servers.json @@ -3,7 +3,7 @@ { "id": "1", "host": "localhost", "port": 2333, "password": "youshallnotpass", "local": true } ], "image": [ - { "server": "localhost", "auth": "verycoolpass100" } + { "server": "localhost", "auth": "verycoolpass100", "tls": false } ], "searx": [ "https://searx.xyz" diff --git a/utils/imageConnection.js b/utils/imageConnection.js index b1b87b4..5873168 100644 --- a/utils/imageConnection.js +++ b/utils/imageConnection.js @@ -26,6 +26,7 @@ class ImageConnection { constructor(host, auth, tls = false) { this.requests = new Map(); this.host = host; + this.port = 3762; this.auth = auth; this.tag = null; this.disconnected = false; @@ -38,7 +39,7 @@ class ImageConnection { } else { this.wsproto = "ws"; } - this.sockurl = `${this.wsproto}://${host}/sock`; + this.sockurl = `${this.wsproto}://${host}:${this.port}/sock`; this.conn = new WebSocket(this.sockurl, { headers: { "Authentication": auth && auth !== "" ? auth : undefined @@ -50,7 +51,7 @@ class ImageConnection { } else { httpproto = "http"; } - this.httpurl = `${httpproto}://${host}/image`; + this.httpurl = `${httpproto}://${host}:${this.port}/image`; this.conn.on("message", (msg) => this.onMessage(msg)); this.conn.once("error", (err) => this.onError(err)); this.conn.once("close", () => this.onClose()); @@ -86,7 +87,7 @@ class ImageConnection { } this.requests.clear(); if (!this.disconnected) { - logger.warn(`Lost connection to ${this.host}, attempting to reconnect in 5 seconds...`); + logger.warn(`Lost connection to ${this.host}:${this.port}, attempting to reconnect in 5 seconds...`); await setTimeout(5000); this.conn = new WebSocket(this.sockurl, { headers: { diff --git a/utils/services/image.js b/utils/services/image.js index c352998..5200e27 100644 --- a/utils/services/image.js +++ b/utils/services/image.js @@ -82,7 +82,7 @@ class ImageWorker extends BaseServiceWorker { } async connect(server, auth) { - const connection = new ImageConnection(`${server}:8080`, auth); + const connection = new ImageConnection(server, auth); this.connections.set(server, connection); } From 30bcb7a969f5104f5ef0fbd2c2bce931ba89938a Mon Sep 17 00:00:00 2001 From: Essem Date: Mon, 6 Dec 2021 01:25:38 -0600 Subject: [PATCH 7/8] Added extra font support to some other commands, handle overflow on image api --- commands/image-editing/caption.js | 2 +- commands/image-editing/caption2.js | 12 +++++++++--- commands/image-editing/meme.js | 8 +++++++- commands/image-editing/motivate.js | 9 ++++++++- natives/caption2.cc | 6 +++++- natives/meme.cc | 6 +++++- natives/motivate.cc | 6 +++++- utils/imageConnection.js | 3 ++- utils/services/image.js | 3 ++- 9 files changed, 44 insertions(+), 11 deletions(-) diff --git a/commands/image-editing/caption.js b/commands/image-editing/caption.js index 755907c..246df19 100644 --- a/commands/image-editing/caption.js +++ b/commands/image-editing/caption.js @@ -1,5 +1,5 @@ import ImageCommand from "../../classes/imageCommand.js"; -const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto"]; +const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto", "times"]; class CaptionCommand extends ImageCommand { params(url) { diff --git a/commands/image-editing/caption2.js b/commands/image-editing/caption2.js index bf08302..22c578d 100644 --- a/commands/image-editing/caption2.js +++ b/commands/image-editing/caption2.js @@ -1,21 +1,27 @@ import ImageCommand from "../../classes/imageCommand.js"; const words = ["me irl", "dank", "follow my second account @esmBot_", "2016", "meme", "wholesome", "reddit", "instagram", "twitter", "facebook", "fortnite", "minecraft", "relatable", "gold", "funny", "template", "hilarious", "memes", "deep fried", "2020", "leafy", "pewdiepie"]; +const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto", "times"]; class CaptionTwoCommand extends ImageCommand { params(url) { const newArgs = this.args.filter(item => !item.includes(url)); return { caption: newArgs.length !== 0 ? newArgs.join(" ").replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%") : words.sort(() => 0.5 - Math.random()).slice(0, Math.floor(Math.random() * words.length + 1)).join(" "), - top: !!this.specialArgs.top + top: !!this.specialArgs.top, + font: this.specialArgs.font && allowedFonts.includes(this.specialArgs.font.toLowerCase()) ? this.specialArgs.font.toLowerCase() : "helvetica" }; } - static description = "Adds a me.me caption/tag list to an image/GIF"; + static description = "Adds a me.me caption/tag list to an image"; static aliases = ["tags2", "meirl", "memecaption", "medotmecaption"]; static arguments = ["{text}"]; static flags = [{ name: "top", - description: "Put the caption on the top of an image/GIF instead of the bottom" + description: "Put the caption on the top of an image instead of the bottom" + }, { + name: "font", + type: allowedFonts.join("|"), + description: "Specify the font you want to use (default: `helvetica`)" }]; static noText = "You need to provide some text to add a caption!"; diff --git a/commands/image-editing/meme.js b/commands/image-editing/meme.js index ea3c42f..fb2c3cf 100644 --- a/commands/image-editing/meme.js +++ b/commands/image-editing/meme.js @@ -1,4 +1,5 @@ import ImageCommand from "../../classes/imageCommand.js"; +const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto", "times"]; class MemeCommand extends ImageCommand { params(url) { @@ -6,7 +7,8 @@ class MemeCommand extends ImageCommand { const [topText, bottomText] = newArgs.join(" ").split(/(? elem.trim()); return { top: (this.specialArgs.case ? topText : topText.toUpperCase()).replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%"), - bottom: bottomText ? (this.specialArgs.case ? bottomText : bottomText.toUpperCase()).replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%") : "" + bottom: bottomText ? (this.specialArgs.case ? bottomText : bottomText.toUpperCase()).replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%") : "", + font: this.specialArgs.font && allowedFonts.includes(this.specialArgs.font.toLowerCase()) ? this.specialArgs.font.toLowerCase() : "impact" }; } @@ -15,6 +17,10 @@ class MemeCommand extends ImageCommand { static flags = [{ name: "case", description: "Make the meme text case-sensitive (allows for lowercase text)" + }, { + name: "font", + type: allowedFonts.join("|"), + description: "Specify the font you want to use (default: `impact`)" }]; static requiresText = true; diff --git a/commands/image-editing/motivate.js b/commands/image-editing/motivate.js index ab39dce..ea81520 100644 --- a/commands/image-editing/motivate.js +++ b/commands/image-editing/motivate.js @@ -1,4 +1,5 @@ import ImageCommand from "../../classes/imageCommand.js"; +const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto", "times"]; class MotivateCommand extends ImageCommand { params(url) { @@ -6,13 +7,19 @@ class MotivateCommand extends ImageCommand { const [topText, bottomText] = newArgs.join(" ").split(/(? elem.trim()); return { top: topText.replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%"), - bottom: bottomText ? bottomText.replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%") : "" + bottom: bottomText ? bottomText.replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%") : "", + font: this.specialArgs.font && allowedFonts.includes(this.specialArgs.font.toLowerCase()) ? this.specialArgs.font.toLowerCase() : "times" }; } static description = "Generates a motivational poster"; static aliases = ["motivational", "motiv", "demotiv", "demotivational", "poster", "motivation", "demotivate"]; static arguments = ["[top text]", "{bottom text}"]; + static flags = [{ + name: "font", + type: allowedFonts.join("|"), + description: "Specify the font you want to use (default: `times`)" + }]; static requiresText = true; static noText = "You need to provide some text to generate a motivational poster!"; diff --git a/natives/caption2.cc b/natives/caption2.cc index 8ac2bf5..84c0c38 100644 --- a/natives/caption2.cc +++ b/natives/caption2.cc @@ -15,6 +15,7 @@ Napi::Value CaptionTwo(const Napi::CallbackInfo &info) { Napi::Buffer data = obj.Get("data").As>(); string caption = obj.Get("caption").As().Utf8Value(); bool top = obj.Get("top").As().Value(); + string font = obj.Get("font").As().Utf8Value(); string type = obj.Get("type").As().Utf8Value(); int delay = obj.Has("delay") ? obj.Get("delay").As().Int32Value() : 0; @@ -39,7 +40,10 @@ Napi::Value CaptionTwo(const Napi::CallbackInfo &info) { caption_image.fillColor("black"); caption_image.font("Helvetica Neue"); caption_image.fontPointsize(width / 17); - caption_image.read("pango:" + caption); + caption_image.read("pango:" + caption + ""); caption_image.extent(Geometry(width, caption_image.rows() + (width / 25)), Magick::CenterGravity); diff --git a/natives/meme.cc b/natives/meme.cc index f5d5dc9..36ce25f 100644 --- a/natives/meme.cc +++ b/natives/meme.cc @@ -15,6 +15,7 @@ Napi::Value Meme(const Napi::CallbackInfo &info) { Napi::Buffer data = obj.Get("data").As>(); string top = obj.Get("top").As().Utf8Value(); string bottom = obj.Get("bottom").As().Utf8Value(); + string font = obj.Get("font").As().Utf8Value(); string type = obj.Get("type").As().Utf8Value(); int delay = obj.Has("delay") ? obj.Get("delay").As().Int32Value() : 0; @@ -43,7 +44,10 @@ Napi::Value Meme(const Napi::CallbackInfo &info) { top_text.font("Impact"); top_text.fontPointsize(width / 12); top_text.textGravity(Magick::CenterGravity); - top_text.read("pango:" + top + ""); + top_text.read("pango:" + top + ""); Image top_text_fill = top_text; top_text_fill.channel(Magick::AlphaChannel); top_text_fill.morphology(Magick::EdgeOutMorphology, "Octagon"); diff --git a/natives/motivate.cc b/natives/motivate.cc index f81804f..62feb5b 100644 --- a/natives/motivate.cc +++ b/natives/motivate.cc @@ -15,6 +15,7 @@ Napi::Value Motivate(const Napi::CallbackInfo &info) { Napi::Buffer data = obj.Get("data").As>(); string top_text = obj.Get("top").As().Utf8Value(); string bottom_text = obj.Get("bottom").As().Utf8Value(); + string font = obj.Get("font").As().Utf8Value(); string type = obj.Get("type").As().Utf8Value(); int delay = obj.Has("delay") ? obj.Get("delay").As().Int32Value() : 0; @@ -40,7 +41,10 @@ Napi::Value Motivate(const Napi::CallbackInfo &info) { top.font("Times"); top.textGravity(Magick::CenterGravity); top.fontPointsize(56); - top.read("pango:" + top_text + ""); + top.read("pango:" + top_text + ""); top.extent(Geometry(bottom_text != "" ? to_string(top.columns()) + "x" + to_string(top.rows()) : to_string(top.columns()) + "x" + diff --git a/utils/imageConnection.js b/utils/imageConnection.js index 5873168..4b71d92 100644 --- a/utils/imageConnection.js +++ b/utils/imageConnection.js @@ -152,7 +152,8 @@ class ImageConnection { async do(op, data) { const buf = Buffer.alloc(1 + 2); - const tag = this.tag++; + let tag = this.tag++; + if (tag > 65535) tag = this.tag = 0; buf.writeUint8(op); buf.writeUint16LE(tag, 1); this.conn.send(Buffer.concat([buf, data])); diff --git a/utils/services/image.js b/utils/services/image.js index 5200e27..593ee33 100644 --- a/utils/services/image.js +++ b/utils/services/image.js @@ -108,7 +108,8 @@ class ImageWorker extends BaseServiceWorker { async run(object) { if (process.env.API === "true") { - const num = this.nextID++; + let num = this.nextID++; + if (num > 4294967295) num = this.nextID = 0; const currentServer = await this.getIdeal(object); await currentServer.queue(num, object); await currentServer.wait(num); From 39ebae8cd7d02386b84a058e325a6064d349f313 Mon Sep 17 00:00:00 2001 From: Essem Date: Thu, 9 Dec 2021 22:31:52 -0600 Subject: [PATCH 8/8] Refactor image job object, fixed image commands that don't require an existing image --- api/IMPLEMENTATION.md | 15 +++++++++++++++ api/index.js | 2 +- classes/imageCommand.js | 13 +++++++------ utils/image-runner.js | 8 ++++---- utils/services/image.js | 2 +- 5 files changed, 28 insertions(+), 12 deletions(-) diff --git a/api/IMPLEMENTATION.md b/api/IMPLEMENTATION.md index eab9674..4664b83 100644 --- a/api/IMPLEMENTATION.md +++ b/api/IMPLEMENTATION.md @@ -31,3 +31,18 @@ A client sends *requests* (T-messages) to a server, which subsequently *replies* - Twait tag[2] jid[4] - Rwait tag[2] - Rinit tag[2] max_jobs[2] formats[j] + +### Job Object +The job object is formatted like this: +```json +{ + "cmd": string, // name of internal image command, e.g. caption + "path": string, // canonical image URL, used for getting the actual image + "url": string, // original image URL, used for message filtering + "params": { // content varies depending on the command, some common parameters are listed here + "type": string, // mime type of output, should usually be the same as input + "delay": integer, // for manually specifying GIF frame delay, set to 0 to use the default delay + ... + } +} +``` \ No newline at end of file diff --git a/api/index.js b/api/index.js index 9cab7d3..64b0950 100644 --- a/api/index.js +++ b/api/index.js @@ -241,7 +241,7 @@ const runJob = (job, ws) => { const object = JSON.parse(job.msg); // If the image has a path, it must also have a type - if (object.path && !object.type) { + if (object.path && !object.params.type) { reject(new TypeError("Unknown image type")); } diff --git a/classes/imageCommand.js b/classes/imageCommand.js index 57bf388..74b2a07 100644 --- a/classes/imageCommand.js +++ b/classes/imageCommand.js @@ -47,7 +47,8 @@ class ImageCommand extends Command { runningCommands.set(this.message.author.id, this.message.createdAt); const magickParams = { - cmd: this.constructor.command + cmd: this.constructor.command, + params: {} }; if (this.constructor.requiresImage) { @@ -64,9 +65,9 @@ class ImageCommand extends Command { return "I've been rate-limited by Tenor. Please try uploading your GIF elsewhere."; } magickParams.path = image.path; - magickParams.type = image.type; + magickParams.params.type = image.type; magickParams.url = image.url; // technically not required but can be useful for text filtering - magickParams.delay = image.delay ? image.delay : 0; + magickParams.params.delay = image.delay ? image.delay : 0; if (this.constructor.requiresGIF) magickParams.onlyGIF = true; } catch (e) { runningCommands.delete(this.message.author.id); @@ -84,15 +85,15 @@ class ImageCommand extends Command { switch (typeof this.params) { case "function": - Object.assign(magickParams, this.params(magickParams.url, magickParams.delay)); + Object.assign(magickParams.params, this.params(magickParams.url, magickParams.delay)); break; case "object": - Object.assign(magickParams, this.params); + Object.assign(magickParams.params, this.params); break; } let status; - if (magickParams.type === "image/gif") { + if (magickParams.params.type === "image/gif") { status = await this.processMessage(this.message); } else { this.client.sendChannelTyping(this.message.channel.id); diff --git a/utils/image-runner.js b/utils/image-runner.js index cbe7221..98a8e92 100644 --- a/utils/image-runner.js +++ b/utils/image-runner.js @@ -11,7 +11,7 @@ function run(object) { // If the image has a path, it must also have a type let promise = new Promise((resolveTest) => { resolveTest(); }); // no-op if (object.path) { - if (object.type !== "image/gif" && object.onlyGIF) resolve({ + if (object.params.type !== "image/gif" && object.onlyGIF) resolve({ buffer: Buffer.alloc(0), fileExtension: "nogif" }); @@ -20,10 +20,10 @@ function run(object) { // Convert from a MIME type (e.g. "image/png") to something ImageMagick understands (e.g. "png"). // Don't set `type` directly on the object we are passed as it will be read afterwards. // If no image type is given (say, the command generates its own image), make it a PNG. - const fileExtension = object.type ? object.type.split("/")[1] : "png"; + const fileExtension = object.params.type ? object.params.type.split("/")[1] : "png"; promise.then(buf => { - object.data = buf; - const objectWithFixedType = Object.assign({}, object, {type: fileExtension}); + object.params.data = buf; + const objectWithFixedType = Object.assign({}, object.params, {type: fileExtension}); try { const result = magick[object.cmd](objectWithFixedType); const returnObject = { diff --git a/utils/services/image.js b/utils/services/image.js index 593ee33..449f45c 100644 --- a/utils/services/image.js +++ b/utils/services/image.js @@ -71,7 +71,7 @@ class ImageWorker extends BaseServiceWorker { if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) { continue; } - if (!connection.formats[object.cmd].includes(object.type)) continue; + if (object.params.type && !connection.formats[object.cmd].includes(object.params.type)) continue; idealServers.push({ addr: address, load: connection.njobs / connection.max