diff --git a/Dockerfile.api b/Dockerfile.api index c1437f3..975ab59 100644 --- a/Dockerfile.api +++ b/Dockerfile.api @@ -55,6 +55,6 @@ RUN npm run build USER esmBot -EXPOSE 8080 8081 +EXPOSE 3762 ENTRYPOINT ["node", "api/index.js"] \ No newline at end of file diff --git a/api/IMPLEMENTATION.md b/api/IMPLEMENTATION.md new file mode 100644 index 0000000..4664b83 --- /dev/null +++ b/api/IMPLEMENTATION.md @@ -0,0 +1,48 @@ +# esmBot Image API +The esmBot image API is a combined HTTP and WebSocket API running on port 3762. The API supports very basic authentication, which is defined on the server via the PASS environment variable and is sent from the client via the Authentication header in both HTTP and WS requests. + +## HTTP + +### GET `/image/?id=` +Get image data after job is finished running. The Content-Type header is properly set. + +## WebSockets +A client sends *requests* (T-messages) to a server, which subsequently *replies* (R-messages) to the client. +### Message IDs +- Rerror 0x01 +- Tqueue 0x02 +- Rqueue 0x03 +- Tcancel 0x04 +- Rcancel 0x05 +- Twait 0x06 +- Rwait 0x07 +- Rinit 0x08 + +### Messages +[n] means n bytes. +[s] means a string that goes until the end of the message. +[j] means JSON data that goes until the end of the message. +`tag` is used to identify a request/response pair, like `lock` in the original API. `jid` is used to identify a job. `job` is a job object. +- Rerror tag[2] error[s] +- Tqueue tag[2] jid[4] job[j] +- Rqueue tag[2] +- Tcancel tag[2] jid[4] +- Rcancel tag[2] +- Twait tag[2] jid[4] +- Rwait tag[2] +- Rinit tag[2] max_jobs[2] formats[j] + +### Job Object +The job object is formatted like this: +```json +{ + "cmd": string, // name of internal image command, e.g. caption + "path": string, // canonical image URL, used for getting the actual image + "url": string, // original image URL, used for message filtering + "params": { // content varies depending on the command, some common parameters are listed here + "type": string, // mime type of output, should usually be the same as input + "delay": integer, // for manually specifying GIF frame delay, set to 0 to use the default delay + ... + } +} +``` \ No newline at end of file diff --git a/api/index.js b/api/index.js index 3cad9e0..64b0950 100644 --- a/api/index.js +++ b/api/index.js @@ -4,9 +4,32 @@ import { cpus } from "os"; import { Worker } from "worker_threads"; import { join } from "path"; import { createServer } from "http"; -import ws from "ws"; +import { WebSocketServer } from "ws"; import { fileURLToPath } from "url"; import { dirname } from "path"; +import { createRequire } from "module"; + +const nodeRequire = createRequire(import.meta.url); +const magick = nodeRequire(`../build/${process.env.DEBUG && process.env.DEBUG === "true" ? "Debug" : "Release"}/image.node`); + +/* +Rerror 0x01 +Tqueue 0x02 +Rqueue 0x03 +Tcancel 0x04 +Rcancel 0x05 +Twait 0x06 +Rwait 0x07 +Rinit 0x08 +*/ +const Rerror = 0x01; +const Tqueue = 0x02; +const Rqueue = 0x03; +const Tcancel = 0x04; +const Rcancel = 0x05; +const Twait = 0x06; +const Rwait = 0x07; +const Rinit = 0x08; const start = process.hrtime(); const log = (msg, jobNum) => { @@ -23,30 +46,34 @@ class JobCache extends Map { } const jobs = new JobCache(); -// Should look like UUID : { msg: "request", num: } +// Should look like ID : { msg: "request", num: } const queue = []; -// Array of UUIDs - -import { v4 as uuidv4 } from "uuid"; +// Array of IDs const MAX_JOBS = process.env.JOBS !== "" && process.env.JOBS !== undefined ? parseInt(process.env.JOBS) : cpus().length * 4; // Completely arbitrary, should usually be some multiple of your amount of cores const PASS = process.env.PASS !== "" && process.env.PASS !== undefined ? process.env.PASS : undefined; let jobAmount = 0; -const acceptJob = (uuid, sock) => { +const acceptJob = (id, sock) => { jobAmount++; queue.shift(); - const job = jobs.get(uuid); + const job = jobs.get(id); return runJob({ - uuid: uuid, + id: id, msg: job.msg, num: job.num }, sock).then(() => { - log(`Job ${uuid} has finished`); + log(`Job ${id} has finished`); }).catch((err) => { - console.error(`Error on job ${uuid}:`, err); - jobs.delete(uuid); - sock.send(Buffer.concat([Buffer.from([0x2]), Buffer.from(uuid), Buffer.from(err.message)])); + console.error(`Error on job ${id}:`, err); + const newJob = jobs.get(id); + if (!newJob.tag) { + newJob.error = err.message; + jobs.set(id, newJob); + return; + } + jobs.delete(id); + sock.send(Buffer.concat([Buffer.from([Rerror]), newJob.tag, Buffer.from(err.message)])); }).finally(() => { jobAmount--; if (queue.length > 0) { @@ -55,10 +82,18 @@ const acceptJob = (uuid, sock) => { }); }; -const wss = new ws.Server({ clientTracking: true, noServer: true }); +const wss = new WebSocketServer({ clientTracking: true, noServer: true }); wss.on("connection", (ws, request) => { log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has connected`); + const num = Buffer.alloc(2); + num.writeUInt16LE(MAX_JOBS); + const formats = {}; + for (const cmd of Object.keys(magick)) { + formats[cmd] = ["image/png", "image/gif", "image/jpeg", "image/webp"]; + } + const init = Buffer.concat([Buffer.from([Rinit]), Buffer.from([0x00, 0x00]), num, Buffer.from(JSON.stringify(formats))]); + ws.send(init); ws.on("error", (err) => { console.error(err); @@ -66,31 +101,47 @@ wss.on("connection", (ws, request) => { ws.on("message", (msg) => { const opcode = msg.readUint8(0); - const req = msg.toString().slice(1,msg.length); - console.log(req); - // 0x00 == Cancel job - // 0x01 == Queue job - if (opcode == 0x00) { - delete queue[queue.indexOf(req) - 1]; - jobs.delete(req); - } else if (opcode == 0x01) { - const length = parseInt(req.slice(0, 1)); - const num = req.slice(1, length + 1); - const obj = req.slice(length + 1); + const tag = msg.slice(1, 3); + const req = msg.toString().slice(3); + if (opcode == Tqueue) { + const id = msg.readUInt32LE(3); + const obj = msg.slice(7); const job = { msg: obj, num: jobAmount }; - const uuid = uuidv4(); - jobs.set(uuid, job); - queue.push(uuid); + jobs.set(id, job); + queue.push(id); - const newBuffer = Buffer.concat([Buffer.from([0x00]), Buffer.from(uuid), Buffer.from(num)]); + const newBuffer = Buffer.concat([Buffer.from([Rqueue]), tag]); ws.send(newBuffer); if (jobAmount < MAX_JOBS) { - log(`Got WS request for job ${job.msg} with id ${uuid}`, job.num); - acceptJob(uuid, ws); + log(`Got WS request for job ${job.msg} with id ${id}`, job.num); + acceptJob(id, ws); } else { - log(`Got WS request for job ${job.msg} with id ${uuid}, queued in position ${queue.indexOf(uuid)}`, job.num); + log(`Got WS request for job ${job.msg} with id ${id}, queued in position ${queue.indexOf(id)}`, job.num); } + } else if (opcode == Tcancel) { + delete queue[queue.indexOf(req) - 1]; + jobs.delete(req); + const cancelResponse = Buffer.concat([Buffer.from([Rcancel]), tag]); + ws.send(cancelResponse); + } else if (opcode == Twait) { + const id = msg.readUInt32LE(3); + const job = jobs.get(id); + if (!job) { + const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from("Invalid job ID")]); + ws.send(errorResponse); + return; + } + if (job.error) { + jobs.delete(id); + const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from(job.error)]); + ws.send(errorResponse); + return; + } + job.tag = tag; + jobs.set(id, job); + //const waitResponse = Buffer.concat([Buffer.from([Rwait]), tag]); + //ws.send(waitResponse); } else { log("Could not parse WS message"); } @@ -117,41 +168,36 @@ httpServer.on("request", async (req, res) => { return res.end("401 Unauthorized"); } const reqUrl = new URL(req.url, `http://${req.headers.host}`); - if (reqUrl.pathname === "/status" && req.method === "GET") { - log(`Sending server status to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`); - const statusObject = { - load: MAX_JOBS - jobAmount, - queued: queue.length - }; - return res.end(JSON.stringify(statusObject)); - } else if (reqUrl.pathname === "/running" && req.method === "GET") { - log(`Sending currently running jobs to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`); - const keys = jobs.keys(); - const newObject = { queued: queue.length, runningJobs: jobAmount, max: MAX_JOBS }; - for (const key of keys) { - const validKeys = Object.keys(jobs.get(key)).filter((value) => value !== "addr" && value !== "port" && value !== "data" && value !== "ext"); - newObject[key] = {}; - for (const validKey of validKeys) { - if (validKey === "msg") { - newObject[key][validKey] = JSON.parse(jobs.get(key)[validKey]); - } else { - newObject[key][validKey] = jobs.get(key)[validKey]; - } - } - } - return res.end(JSON.stringify(newObject)); - } else if (reqUrl.pathname === "/image" && req.method === "GET") { + if (reqUrl.pathname === "/image" && req.method === "GET") { if (!reqUrl.searchParams.has("id")) { res.statusCode = 400; return res.end("400 Bad Request"); } - const id = reqUrl.searchParams.get("id"); + const id = parseInt(reqUrl.searchParams.get("id")); if (!jobs.has(id)) { res.statusCode = 410; return res.end("410 Gone"); } log(`Sending image data for job ${id} to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`); - res.setHeader("ext", jobs.get(id).ext); + const ext = jobs.get(id).ext; + let contentType; + switch (ext) { + case "gif": + contentType = "image/gif"; + break; + case "png": + contentType = "image/png"; + break; + case "jpeg": + case "jpg": + contentType = "image/jpeg"; + break; + case "webp": + contentType = "image/webp"; + break; + } + if (contentType) res.setHeader("Content-Type", contentType); + else res.setHeader("Content-Type", ext); const data = jobs.get(id).data; jobs.delete(id); return res.end(data, (err) => { @@ -185,17 +231,17 @@ httpServer.on("error", (e) => { console.error("An HTTP error occurred: ", e); }); -httpServer.listen(8080, () => { - log("HTTP and WS listening on port 8080"); +httpServer.listen(3762, () => { + log("HTTP and WS listening on port 3762"); }); -const runJob = (job, sock) => { +const runJob = (job, ws) => { return new Promise((resolve, reject) => { - log(`Job ${job.uuid} starting...`, job.num); + log(`Job ${job.id} starting...`, job.num); const object = JSON.parse(job.msg); // If the image has a path, it must also have a type - if (object.path && !object.type) { + if (object.path && !object.params.type) { reject(new TypeError("Unknown image type")); } @@ -206,34 +252,21 @@ const runJob = (job, sock) => { worker.terminate(); reject(new Error("Job timed out")); }, 900000); - log(`Job ${job.uuid} started`, job.num); + log(`Job ${job.id} started`, job.num); worker.once("message", (data) => { clearTimeout(timeout); - log(`Sending result of job ${job.uuid} back to the bot`, job.num); - const jobObject = jobs.get(job.uuid); + log(`Sending result of job ${job.id} back to the bot`, job.num); + const jobObject = jobs.get(job.id); jobObject.data = data.buffer; jobObject.ext = data.fileExtension; - jobs.set(job.uuid, jobObject); - sock.send(Buffer.concat([Buffer.from([0x1]), Buffer.from(job.uuid)]), () => { - return resolve(); - }); + jobs.set(job.id, jobObject); + const waitResponse = Buffer.concat([Buffer.from([Rwait]), jobObject.tag]); + ws.send(waitResponse); + resolve(); }); worker.once("error", (e) => { clearTimeout(timeout); reject(e); }); - /*run(object).then((data) => { - log(`Sending result of job ${job.uuid} back to the bot`, job.num); - const jobObject = jobs.get(job.uuid); - jobObject.data = data.buffer; - jobObject.ext = data.fileExtension; - jobs.set(job.uuid, jobObject); - sock.write(Buffer.concat([Buffer.from([0x1]), Buffer.from(job.uuid)]), (e) => { - if (e) return reject(e); - return resolve(); - }); - }).catch(e => { - reject(e); - });*/ }); }; \ No newline at end of file diff --git a/classes/imageCommand.js b/classes/imageCommand.js index 57bf388..74b2a07 100644 --- a/classes/imageCommand.js +++ b/classes/imageCommand.js @@ -47,7 +47,8 @@ class ImageCommand extends Command { runningCommands.set(this.message.author.id, this.message.createdAt); const magickParams = { - cmd: this.constructor.command + cmd: this.constructor.command, + params: {} }; if (this.constructor.requiresImage) { @@ -64,9 +65,9 @@ class ImageCommand extends Command { return "I've been rate-limited by Tenor. Please try uploading your GIF elsewhere."; } magickParams.path = image.path; - magickParams.type = image.type; + magickParams.params.type = image.type; magickParams.url = image.url; // technically not required but can be useful for text filtering - magickParams.delay = image.delay ? image.delay : 0; + magickParams.params.delay = image.delay ? image.delay : 0; if (this.constructor.requiresGIF) magickParams.onlyGIF = true; } catch (e) { runningCommands.delete(this.message.author.id); @@ -84,15 +85,15 @@ class ImageCommand extends Command { switch (typeof this.params) { case "function": - Object.assign(magickParams, this.params(magickParams.url, magickParams.delay)); + Object.assign(magickParams.params, this.params(magickParams.url, magickParams.delay)); break; case "object": - Object.assign(magickParams, this.params); + Object.assign(magickParams.params, this.params); break; } let status; - if (magickParams.type === "image/gif") { + if (magickParams.params.type === "image/gif") { status = await this.processMessage(this.message); } else { this.client.sendChannelTyping(this.message.channel.id); diff --git a/commands/general/imagestats.js b/commands/general/imagestats.js index eace592..d2cbd32 100644 --- a/commands/general/imagestats.js +++ b/commands/general/imagestats.js @@ -18,7 +18,7 @@ class ImageStatsCommand extends Command { for (let i = 0; i < servers.length; i++) { embed.embeds[0].fields.push({ name: `Server ${i + 1}`, - value: `Running Jobs: ${servers[i].runningJobs}\nQueued: ${servers[i].queued}\nMax Jobs: ${servers[i].max}` + value: `Running Jobs: ${servers[i].runningJobs}\nQueued: ${Math.max(0, servers[i].runningJobs - servers[i].max)}\nMax Jobs: ${servers[i].max}` }); } return embed; diff --git a/commands/image-editing/caption.js b/commands/image-editing/caption.js index 755907c..246df19 100644 --- a/commands/image-editing/caption.js +++ b/commands/image-editing/caption.js @@ -1,5 +1,5 @@ import ImageCommand from "../../classes/imageCommand.js"; -const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto"]; +const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto", "times"]; class CaptionCommand extends ImageCommand { params(url) { diff --git a/commands/image-editing/caption2.js b/commands/image-editing/caption2.js index bf08302..22c578d 100644 --- a/commands/image-editing/caption2.js +++ b/commands/image-editing/caption2.js @@ -1,21 +1,27 @@ import ImageCommand from "../../classes/imageCommand.js"; const words = ["me irl", "dank", "follow my second account @esmBot_", "2016", "meme", "wholesome", "reddit", "instagram", "twitter", "facebook", "fortnite", "minecraft", "relatable", "gold", "funny", "template", "hilarious", "memes", "deep fried", "2020", "leafy", "pewdiepie"]; +const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto", "times"]; class CaptionTwoCommand extends ImageCommand { params(url) { const newArgs = this.args.filter(item => !item.includes(url)); return { caption: newArgs.length !== 0 ? newArgs.join(" ").replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%") : words.sort(() => 0.5 - Math.random()).slice(0, Math.floor(Math.random() * words.length + 1)).join(" "), - top: !!this.specialArgs.top + top: !!this.specialArgs.top, + font: this.specialArgs.font && allowedFonts.includes(this.specialArgs.font.toLowerCase()) ? this.specialArgs.font.toLowerCase() : "helvetica" }; } - static description = "Adds a me.me caption/tag list to an image/GIF"; + static description = "Adds a me.me caption/tag list to an image"; static aliases = ["tags2", "meirl", "memecaption", "medotmecaption"]; static arguments = ["{text}"]; static flags = [{ name: "top", - description: "Put the caption on the top of an image/GIF instead of the bottom" + description: "Put the caption on the top of an image instead of the bottom" + }, { + name: "font", + type: allowedFonts.join("|"), + description: "Specify the font you want to use (default: `helvetica`)" }]; static noText = "You need to provide some text to add a caption!"; diff --git a/commands/image-editing/meme.js b/commands/image-editing/meme.js index ea3c42f..fb2c3cf 100644 --- a/commands/image-editing/meme.js +++ b/commands/image-editing/meme.js @@ -1,4 +1,5 @@ import ImageCommand from "../../classes/imageCommand.js"; +const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto", "times"]; class MemeCommand extends ImageCommand { params(url) { @@ -6,7 +7,8 @@ class MemeCommand extends ImageCommand { const [topText, bottomText] = newArgs.join(" ").split(/(? elem.trim()); return { top: (this.specialArgs.case ? topText : topText.toUpperCase()).replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%"), - bottom: bottomText ? (this.specialArgs.case ? bottomText : bottomText.toUpperCase()).replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%") : "" + bottom: bottomText ? (this.specialArgs.case ? bottomText : bottomText.toUpperCase()).replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%") : "", + font: this.specialArgs.font && allowedFonts.includes(this.specialArgs.font.toLowerCase()) ? this.specialArgs.font.toLowerCase() : "impact" }; } @@ -15,6 +17,10 @@ class MemeCommand extends ImageCommand { static flags = [{ name: "case", description: "Make the meme text case-sensitive (allows for lowercase text)" + }, { + name: "font", + type: allowedFonts.join("|"), + description: "Specify the font you want to use (default: `impact`)" }]; static requiresText = true; diff --git a/commands/image-editing/motivate.js b/commands/image-editing/motivate.js index ab39dce..ea81520 100644 --- a/commands/image-editing/motivate.js +++ b/commands/image-editing/motivate.js @@ -1,4 +1,5 @@ import ImageCommand from "../../classes/imageCommand.js"; +const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto", "times"]; class MotivateCommand extends ImageCommand { params(url) { @@ -6,13 +7,19 @@ class MotivateCommand extends ImageCommand { const [topText, bottomText] = newArgs.join(" ").split(/(? elem.trim()); return { top: topText.replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%"), - bottom: bottomText ? bottomText.replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%") : "" + bottom: bottomText ? bottomText.replaceAll("&", "\\&").replaceAll(">", "\\>").replaceAll("<", "\\<").replaceAll("\"", "\\"").replaceAll("'", "\\'").replaceAll("%", "\\%") : "", + font: this.specialArgs.font && allowedFonts.includes(this.specialArgs.font.toLowerCase()) ? this.specialArgs.font.toLowerCase() : "times" }; } static description = "Generates a motivational poster"; static aliases = ["motivational", "motiv", "demotiv", "demotivational", "poster", "motivation", "demotivate"]; static arguments = ["[top text]", "{bottom text}"]; + static flags = [{ + name: "font", + type: allowedFonts.join("|"), + description: "Specify the font you want to use (default: `times`)" + }]; static requiresText = true; static noText = "You need to provide some text to generate a motivational poster!"; diff --git a/natives/caption2.cc b/natives/caption2.cc index 8ac2bf5..84c0c38 100644 --- a/natives/caption2.cc +++ b/natives/caption2.cc @@ -15,6 +15,7 @@ Napi::Value CaptionTwo(const Napi::CallbackInfo &info) { Napi::Buffer data = obj.Get("data").As>(); string caption = obj.Get("caption").As().Utf8Value(); bool top = obj.Get("top").As().Value(); + string font = obj.Get("font").As().Utf8Value(); string type = obj.Get("type").As().Utf8Value(); int delay = obj.Has("delay") ? obj.Get("delay").As().Int32Value() : 0; @@ -39,7 +40,10 @@ Napi::Value CaptionTwo(const Napi::CallbackInfo &info) { caption_image.fillColor("black"); caption_image.font("Helvetica Neue"); caption_image.fontPointsize(width / 17); - caption_image.read("pango:" + caption); + caption_image.read("pango:" + caption + ""); caption_image.extent(Geometry(width, caption_image.rows() + (width / 25)), Magick::CenterGravity); diff --git a/natives/meme.cc b/natives/meme.cc index f5d5dc9..36ce25f 100644 --- a/natives/meme.cc +++ b/natives/meme.cc @@ -15,6 +15,7 @@ Napi::Value Meme(const Napi::CallbackInfo &info) { Napi::Buffer data = obj.Get("data").As>(); string top = obj.Get("top").As().Utf8Value(); string bottom = obj.Get("bottom").As().Utf8Value(); + string font = obj.Get("font").As().Utf8Value(); string type = obj.Get("type").As().Utf8Value(); int delay = obj.Has("delay") ? obj.Get("delay").As().Int32Value() : 0; @@ -43,7 +44,10 @@ Napi::Value Meme(const Napi::CallbackInfo &info) { top_text.font("Impact"); top_text.fontPointsize(width / 12); top_text.textGravity(Magick::CenterGravity); - top_text.read("pango:" + top + ""); + top_text.read("pango:" + top + ""); Image top_text_fill = top_text; top_text_fill.channel(Magick::AlphaChannel); top_text_fill.morphology(Magick::EdgeOutMorphology, "Octagon"); diff --git a/natives/motivate.cc b/natives/motivate.cc index f81804f..62feb5b 100644 --- a/natives/motivate.cc +++ b/natives/motivate.cc @@ -15,6 +15,7 @@ Napi::Value Motivate(const Napi::CallbackInfo &info) { Napi::Buffer data = obj.Get("data").As>(); string top_text = obj.Get("top").As().Utf8Value(); string bottom_text = obj.Get("bottom").As().Utf8Value(); + string font = obj.Get("font").As().Utf8Value(); string type = obj.Get("type").As().Utf8Value(); int delay = obj.Has("delay") ? obj.Get("delay").As().Int32Value() : 0; @@ -40,7 +41,10 @@ Napi::Value Motivate(const Napi::CallbackInfo &info) { top.font("Times"); top.textGravity(Magick::CenterGravity); top.fontPointsize(56); - top.read("pango:" + top_text + ""); + top.read("pango:" + top_text + ""); top.extent(Geometry(bottom_text != "" ? to_string(top.columns()) + "x" + to_string(top.rows()) : to_string(top.columns()) + "x" + diff --git a/servers.json b/servers.json index 8a2d11f..c138d49 100644 --- a/servers.json +++ b/servers.json @@ -3,7 +3,7 @@ { "id": "1", "host": "localhost", "port": 2333, "password": "youshallnotpass", "local": true } ], "image": [ - { "server": "localhost", "auth": "verycoolpass100" } + { "server": "localhost", "auth": "verycoolpass100", "tls": false } ], "searx": [ "https://searx.xyz" diff --git a/utils/image-runner.js b/utils/image-runner.js index cbe7221..98a8e92 100644 --- a/utils/image-runner.js +++ b/utils/image-runner.js @@ -11,7 +11,7 @@ function run(object) { // If the image has a path, it must also have a type let promise = new Promise((resolveTest) => { resolveTest(); }); // no-op if (object.path) { - if (object.type !== "image/gif" && object.onlyGIF) resolve({ + if (object.params.type !== "image/gif" && object.onlyGIF) resolve({ buffer: Buffer.alloc(0), fileExtension: "nogif" }); @@ -20,10 +20,10 @@ function run(object) { // Convert from a MIME type (e.g. "image/png") to something ImageMagick understands (e.g. "png"). // Don't set `type` directly on the object we are passed as it will be read afterwards. // If no image type is given (say, the command generates its own image), make it a PNG. - const fileExtension = object.type ? object.type.split("/")[1] : "png"; + const fileExtension = object.params.type ? object.params.type.split("/")[1] : "png"; promise.then(buf => { - object.data = buf; - const objectWithFixedType = Object.assign({}, object, {type: fileExtension}); + object.params.data = buf; + const objectWithFixedType = Object.assign({}, object.params, {type: fileExtension}); try { const result = magick[object.cmd](objectWithFixedType); const returnObject = { diff --git a/utils/image.js b/utils/image.js index 6ab807a..1486b60 100644 --- a/utils/image.js +++ b/utils/image.js @@ -2,7 +2,7 @@ import fetch from "node-fetch"; import fs from "fs"; import fileType from "file-type"; -const formats = ["image/jpeg", "image/png", "image/webp", "image/gif", "video/mp4", "video/webm", "video/mov"]; +const formats = ["image/jpeg", "image/png", "image/webp", "image/gif", "video/mp4", "video/webm", "video/quicktime"]; export const jobs = {}; diff --git a/utils/imageConnection.js b/utils/imageConnection.js new file mode 100644 index 0000000..4b71d92 --- /dev/null +++ b/utils/imageConnection.js @@ -0,0 +1,167 @@ +import fetch from "node-fetch"; +import WebSocket from "ws"; +import * as logger from "./logger.js"; +import { setTimeout } from "timers/promises"; + +/* +Rerror 0x01 +Tqueue 0x02 +Rqueue 0x03 +Tcancel 0x04 +Rcancel 0x05 +Twait 0x06 +Rwait 0x07 +Rinit 0x08 +*/ +const Rerror = 0x01; +const Tqueue = 0x02; +const Rqueue = 0x03; +const Tcancel = 0x04; +const Rcancel = 0x05; +const Twait = 0x06; +const Rwait = 0x07; +const Rinit = 0x08; + +class ImageConnection { + constructor(host, auth, tls = false) { + this.requests = new Map(); + this.host = host; + this.port = 3762; + this.auth = auth; + this.tag = null; + this.disconnected = false; + this.njobs = 0; + this.max = 0; + this.formats = {}; + this.wsproto = null; + if (tls) { + this.wsproto = "wss"; + } else { + this.wsproto = "ws"; + } + this.sockurl = `${this.wsproto}://${host}:${this.port}/sock`; + this.conn = new WebSocket(this.sockurl, { + headers: { + "Authentication": auth && auth !== "" ? auth : undefined + } + }); + let httpproto; + if (tls) { + httpproto = "https"; + } else { + httpproto = "http"; + } + this.httpurl = `${httpproto}://${host}:${this.port}/image`; + this.conn.on("message", (msg) => this.onMessage(msg)); + this.conn.once("error", (err) => this.onError(err)); + this.conn.once("close", () => this.onClose()); + } + + onMessage(msg) { + const op = msg.readUint8(0); + if (op === Rinit) { + this.max = msg.readUint16LE(3); + this.formats = JSON.parse(msg.toString("utf8", 5)); + return; + } + const tag = msg.readUint16LE(1); + const promise = this.requests.get(tag); + this.requests.delete(tag); + if (op === Rqueue) this.njobs++; + if (op === Rcancel || op === Rwait) this.njobs--; + if (op === Rerror) { + this.njobs--; + promise.reject(new Error(msg.slice(3, msg.length).toString())); + return; + } + promise.resolve(); + } + + onError(e) { + logger.error(e.toString()); + } + + async onClose() { + for (const promise of this.requests.values()) { + promise.reject(new Error("Request ended prematurely due to a closed connection")); + } + this.requests.clear(); + if (!this.disconnected) { + logger.warn(`Lost connection to ${this.host}:${this.port}, attempting to reconnect in 5 seconds...`); + await setTimeout(5000); + this.conn = new WebSocket(this.sockurl, { + headers: { + "Authentication": this.auth + } + }); + this.conn.on("message", (msg) => this.onMessage(msg)); + this.conn.once("error", (err) => this.onError(err)); + this.conn.once("close", () => this.onClose()); + } + this.disconnected = false; + } + + close() { + this.disconnected = true; + this.conn.close(); + } + + queue(jobid, jobobj) { + const str = JSON.stringify(jobobj); + const buf = Buffer.alloc(4); + buf.writeUint32LE(jobid); + return this.do(Tqueue, Buffer.concat([buf, Buffer.from(str)])); + } + + wait(jobid) { + const buf = Buffer.alloc(4); + buf.writeUint32LE(jobid); + return this.do(Twait, buf); + } + + cancel(jobid) { + const buf = Buffer.alloc(4); + buf.writeUint32LE(jobid); + return this.do(Tcancel, buf); + } + + async getOutput(jobid) { + const req = await fetch(`${this.httpurl}?id=${jobid}`, { + headers: { + "Authentication": this.auth && this.auth !== "" ? this.auth : undefined + } + }); + const contentType = req.headers.get("Content-Type"); + let type; + switch (contentType) { + case "image/gif": + type = "gif"; + break; + case "image/png": + type = "png"; + break; + case "image/jpeg": + type = "jpg"; + break; + case "image/webp": + type = "webp"; + break; + } + return { buffer: Buffer.from(await req.arrayBuffer()), type }; + } + + async do(op, data) { + const buf = Buffer.alloc(1 + 2); + let tag = this.tag++; + if (tag > 65535) tag = this.tag = 0; + buf.writeUint8(op); + buf.writeUint16LE(tag, 1); + this.conn.send(Buffer.concat([buf, data])); + const promise = new Promise((resolve, reject) => { + this.requests.set(tag, { resolve, reject }); + }); + return promise; + } +} + +export default ImageConnection; \ No newline at end of file diff --git a/utils/services/image.js b/utils/services/image.js index 6bcb24f..449f45c 100644 --- a/utils/services/image.js +++ b/utils/services/image.js @@ -1,12 +1,11 @@ import { BaseServiceWorker } from "eris-fleet"; import * as logger from "../logger.js"; -import fetch from "node-fetch"; -import WebSocket from "ws"; import fs from "fs"; import path from "path"; import { fileURLToPath } from "url"; import { Worker } from "worker_threads"; -import { EventEmitter } from "events"; + +import ImageConnection from "../imageConnection.js"; class ImageWorker extends BaseServiceWorker { constructor(setup) { @@ -16,6 +15,7 @@ class ImageWorker extends BaseServiceWorker { this.jobs = {}; this.connections = new Map(); this.servers = JSON.parse(fs.readFileSync("./servers.json", { encoding: "utf8" })).image; + this.nextID = 0; } this.begin().then(() => this.serviceReady()); @@ -41,46 +41,18 @@ class ImageWorker extends BaseServiceWorker { } async getRunning() { - let serversLeft = this.connections.size; const statuses = []; - for (const address of this.connections.keys()) { - const connection = this.connections.get(address); - if (connection.readyState !== 0 && connection.readyState !== 1) { - serversLeft--; + for (const [address, connection] of this.connections) { + if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) { continue; } - const controller = new AbortController(); // eslint-disable-line no-undef - const timeout = setTimeout(() => { - controller.abort(); - }, 2000); - try { - const auth = this.servers.filter((val) => val.server === address)[0].auth; - const statusRequest = await fetch(`http://${address}:8080/running`, { - signal: controller.signal, - headers: { - "Authentication": auth && auth !== "" ? auth : undefined - } - }); - clearTimeout(timeout); - const status = await statusRequest.json(); - serversLeft--; - statuses.push(status); - } catch (e) { - if (e.name === "AbortError") { - serversLeft--; - continue; - } else if (e.code === "ECONNREFUSED") { - serversLeft--; - continue; - } - throw e; - } - } - if (!serversLeft) { - return statuses; - } else { - throw new Error("Loop ended before all servers could be checked"); + statuses.push({ + address, + runningJobs: connection.njobs, + max: connection.max + }); } + return statuses; } async chooseServer(ideal) { @@ -89,119 +61,28 @@ class ImageWorker extends BaseServiceWorker { return b.load - a.load; }).filter((e, i, array) => { return !(e.load < array[0].load); - }).sort((a, b) => { - return a.queued - b.queued; }); return sorted[0]; } - async getIdeal() { - let serversLeft = this.connections.size; - if (serversLeft < this.servers.length) { - for (const server of this.servers) { - try { - if (!this.connections.has(server.server)) await this.connect(server.server, server.auth); - } catch (e) { - logger.error(e); - } - } - serversLeft = this.connections.size; - } + async getIdeal(object) { const idealServers = []; - for (const address of this.connections.keys()) { - const connection = this.connections.get(address); - if (connection.readyState !== 0 && connection.readyState !== 1) { - serversLeft--; + for (const [address, connection] of this.connections) { + if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) { continue; } - const controller = new AbortController(); // eslint-disable-line no-undef - const timeout = setTimeout(() => { - controller.abort(); - }, 5000); - try { - const auth = this.servers.filter((val) => val.server === address)[0].auth; - const statusRequest = await fetch(`http://${address}:8080/status`, { - signal: controller.signal, - headers: { - "Authentication": auth && auth !== "" ? auth : undefined - } - }); - clearTimeout(timeout); - const status = await statusRequest.json(); - serversLeft--; - idealServers.push({ - addr: address, - load: status.load, - queued: status.queued - }); - } catch (e) { - if (e.name === "AbortError") { - serversLeft--; - continue; - } else if (e.code === "ECONNREFUSED") { - serversLeft--; - continue; - } - throw e; - } finally { - clearTimeout(timeout); - } - } - if (!serversLeft) { - const server = await this.chooseServer(idealServers); - return { addr: server.addr, sock: this.connections.get(server.addr) }; - } else { - throw new Error("Loop ended before all servers could be checked"); + if (object.params.type && !connection.formats[object.cmd].includes(object.params.type)) continue; + idealServers.push({ + addr: address, + load: connection.njobs / connection.max + }); } + const server = await this.chooseServer(idealServers); + return this.connections.get(server.addr); } async connect(server, auth) { - const connection = new WebSocket(`ws://${server}:8080/sock`, { - headers: { - "Authentication": auth && auth !== "" ? auth : undefined - } - }); - connection.on("message", async (msg) => { - const opcode = msg.readUint8(0); - const req = msg.slice(37, msg.length); - const uuid = msg.slice(1, 37).toString(); - if (opcode === 0x00) { // Job queued - if (this.jobs[req]) { - this.jobs[req].event.emit("uuid", uuid); - } - } else if (opcode === 0x01) { // Job completed successfully - // the image API sends all job responses over the same socket; make sure this is ours - if (this.jobs[uuid]) { - const imageReq = await fetch(`http://${server}:8080/image?id=${uuid}`, { - headers: { - "Authentication": auth && auth !== "" ? auth : undefined - } - }); - const image = Buffer.from(await imageReq.arrayBuffer()); - // The response data is given as the file extension/ImageMagick type of the image (e.g. "png"), followed - // by a newline, followed by the image data. - - this.jobs[uuid].event.emit("image", image, imageReq.headers.get("ext")); - } - } else if (opcode === 0x02) { // Job errored - if (this.jobs[uuid]) { - this.jobs[uuid].event.emit("error", new Error(req)); - } - } - }); - connection.on("error", (e) => { - logger.error(e.toString()); - }); - connection.once("close", () => { - for (const uuid of Object.keys(this.jobs)) { - if (this.jobs[uuid].addr === server) { - this.jobs[uuid].event.emit("error", "Job ended prematurely due to a closed connection; please run your image job again"); - delete this.jobs[uuid]; - } - } - //logger.log(`Lost connection to ${server}, attempting to reconnect...`); - this.connections.delete(server); - }); + const connection = new ImageConnection(server, auth); this.connections.set(server, connection); } @@ -209,70 +90,38 @@ class ImageWorker extends BaseServiceWorker { for (const connection of this.connections.values()) { connection.close(); } - for (const uuid of Object.keys(this.jobs)) { - this.jobs[uuid].event.emit("error", "Job ended prematurely (not really an error; just run your image job again)"); - delete this.jobs[uuid]; - } this.connections.clear(); return; } - async start(object, num) { - const currentServer = await this.getIdeal(); - const data = Buffer.concat([Buffer.from([0x01 /* queue job */]), Buffer.from(num.length.toString()), Buffer.from(num), Buffer.from(JSON.stringify(object))]); - currentServer.sock.send(data); - const event = new EventEmitter(); - this.jobs[num] = { event, addr: currentServer.addr }; - const uuid = await new Promise((resolve, reject) => { - event.once("uuid", (uuid) => resolve(uuid)); - event.once("error", reject); + waitForWorker(worker) { + return new Promise((resolve, reject) => { + worker.once("message", (data) => { + resolve({ + buffer: Buffer.from([...data.buffer]), + type: data.fileExtension + }); + }); + worker.once("error", reject); }); - delete this.jobs[num]; - this.jobs[uuid] = { event: event, addr: currentServer.addr }; - return { uuid: uuid, event: event }; } - run(object) { - return new Promise((resolve, reject) => { - if (process.env.API === "true") { - // Connect to best image server - const num = Math.floor(Math.random() * 100000).toString().slice(0, 5); - const timeout = setTimeout(() => { - if (this.jobs[num]) delete this.jobs[num]; - reject("The image request timed out after 25 seconds. Try uploading your image elsewhere."); - }, 25000); - this.start(object, num).then((data) => { - clearTimeout(timeout); - if (!data.event) reject("Not connected to image server"); - data.event.once("image", (image, type) => { - delete this.jobs[data.uuid]; - const payload = { - // Take just the image data - buffer: image, - type: type - }; - resolve(payload); - }); - data.event.once("error", (err) => { - delete this.jobs[data.uuid]; - reject(err); - }); - return; - }).catch(err => reject(err)); - } else { - // Called from command (not using image API) - const worker = new Worker(path.join(path.dirname(fileURLToPath(import.meta.url)), "../image-runner.js"), { - workerData: object - }); - worker.once("message", (data) => { - resolve({ - buffer: Buffer.from([...data.buffer]), - type: data.fileExtension - }); - }); - worker.once("error", reject); - } - }); + async run(object) { + if (process.env.API === "true") { + let num = this.nextID++; + if (num > 4294967295) num = this.nextID = 0; + const currentServer = await this.getIdeal(object); + await currentServer.queue(num, object); + await currentServer.wait(num); + const output = await currentServer.getOutput(num); + return output; + } else { + // Called from command (not using image API) + const worker = new Worker(path.join(path.dirname(fileURLToPath(import.meta.url)), "../image-runner.js"), { + workerData: object + }); + return await this.waitForWorker(worker); + } } async handleCommand(data) {