Merge branch 'image-api-logic'

This commit is contained in:
Essem 2021-12-10 00:13:46 -06:00
commit 5d89d78b44
No known key found for this signature in database
GPG Key ID: 7D497397CC3A2A8C
17 changed files with 432 additions and 303 deletions

View File

@ -55,6 +55,6 @@ RUN npm run build
USER esmBot
EXPOSE 8080 8081
EXPOSE 3762
ENTRYPOINT ["node", "api/index.js"]

48
api/IMPLEMENTATION.md Normal file
View File

@ -0,0 +1,48 @@
# esmBot Image API
The esmBot image API is a combined HTTP and WebSocket API running on port 3762. The API supports very basic authentication, which is defined on the server via the PASS environment variable and is sent from the client via the Authentication header in both HTTP and WS requests.
## HTTP
### GET `/image/?id=<job id>`
Get image data after job is finished running. The Content-Type header is properly set.
## WebSockets
A client sends *requests* (T-messages) to a server, which subsequently *replies* (R-messages) to the client.
### Message IDs
- Rerror 0x01
- Tqueue 0x02
- Rqueue 0x03
- Tcancel 0x04
- Rcancel 0x05
- Twait 0x06
- Rwait 0x07
- Rinit 0x08
### Messages
[n] means n bytes.
[s] means a string that goes until the end of the message.
[j] means JSON data that goes until the end of the message.
`tag` is used to identify a request/response pair, like `lock` in the original API. `jid` is used to identify a job. `job` is a job object.
- Rerror tag[2] error[s]
- Tqueue tag[2] jid[4] job[j]
- Rqueue tag[2]
- Tcancel tag[2] jid[4]
- Rcancel tag[2]
- Twait tag[2] jid[4]
- Rwait tag[2]
- Rinit tag[2] max_jobs[2] formats[j]
### Job Object
The job object is formatted like this:
```json
{
"cmd": string, // name of internal image command, e.g. caption
"path": string, // canonical image URL, used for getting the actual image
"url": string, // original image URL, used for message filtering
"params": { // content varies depending on the command, some common parameters are listed here
"type": string, // mime type of output, should usually be the same as input
"delay": integer, // for manually specifying GIF frame delay, set to 0 to use the default delay
...
}
}
```

View File

@ -4,9 +4,32 @@ import { cpus } from "os";
import { Worker } from "worker_threads";
import { join } from "path";
import { createServer } from "http";
import ws from "ws";
import { WebSocketServer } from "ws";
import { fileURLToPath } from "url";
import { dirname } from "path";
import { createRequire } from "module";
const nodeRequire = createRequire(import.meta.url);
const magick = nodeRequire(`../build/${process.env.DEBUG && process.env.DEBUG === "true" ? "Debug" : "Release"}/image.node`);
/*
Rerror 0x01
Tqueue 0x02
Rqueue 0x03
Tcancel 0x04
Rcancel 0x05
Twait 0x06
Rwait 0x07
Rinit 0x08
*/
const Rerror = 0x01;
const Tqueue = 0x02;
const Rqueue = 0x03;
const Tcancel = 0x04;
const Rcancel = 0x05;
const Twait = 0x06;
const Rwait = 0x07;
const Rinit = 0x08;
const start = process.hrtime();
const log = (msg, jobNum) => {
@ -23,30 +46,34 @@ class JobCache extends Map {
}
const jobs = new JobCache();
// Should look like UUID : { msg: "request", num: <job number> }
// Should look like ID : { msg: "request", num: <job number> }
const queue = [];
// Array of UUIDs
import { v4 as uuidv4 } from "uuid";
// Array of IDs
const MAX_JOBS = process.env.JOBS !== "" && process.env.JOBS !== undefined ? parseInt(process.env.JOBS) : cpus().length * 4; // Completely arbitrary, should usually be some multiple of your amount of cores
const PASS = process.env.PASS !== "" && process.env.PASS !== undefined ? process.env.PASS : undefined;
let jobAmount = 0;
const acceptJob = (uuid, sock) => {
const acceptJob = (id, sock) => {
jobAmount++;
queue.shift();
const job = jobs.get(uuid);
const job = jobs.get(id);
return runJob({
uuid: uuid,
id: id,
msg: job.msg,
num: job.num
}, sock).then(() => {
log(`Job ${uuid} has finished`);
log(`Job ${id} has finished`);
}).catch((err) => {
console.error(`Error on job ${uuid}:`, err);
jobs.delete(uuid);
sock.send(Buffer.concat([Buffer.from([0x2]), Buffer.from(uuid), Buffer.from(err.message)]));
console.error(`Error on job ${id}:`, err);
const newJob = jobs.get(id);
if (!newJob.tag) {
newJob.error = err.message;
jobs.set(id, newJob);
return;
}
jobs.delete(id);
sock.send(Buffer.concat([Buffer.from([Rerror]), newJob.tag, Buffer.from(err.message)]));
}).finally(() => {
jobAmount--;
if (queue.length > 0) {
@ -55,10 +82,18 @@ const acceptJob = (uuid, sock) => {
});
};
const wss = new ws.Server({ clientTracking: true, noServer: true });
const wss = new WebSocketServer({ clientTracking: true, noServer: true });
wss.on("connection", (ws, request) => {
log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has connected`);
const num = Buffer.alloc(2);
num.writeUInt16LE(MAX_JOBS);
const formats = {};
for (const cmd of Object.keys(magick)) {
formats[cmd] = ["image/png", "image/gif", "image/jpeg", "image/webp"];
}
const init = Buffer.concat([Buffer.from([Rinit]), Buffer.from([0x00, 0x00]), num, Buffer.from(JSON.stringify(formats))]);
ws.send(init);
ws.on("error", (err) => {
console.error(err);
@ -66,31 +101,47 @@ wss.on("connection", (ws, request) => {
ws.on("message", (msg) => {
const opcode = msg.readUint8(0);
const req = msg.toString().slice(1,msg.length);
console.log(req);
// 0x00 == Cancel job
// 0x01 == Queue job
if (opcode == 0x00) {
delete queue[queue.indexOf(req) - 1];
jobs.delete(req);
} else if (opcode == 0x01) {
const length = parseInt(req.slice(0, 1));
const num = req.slice(1, length + 1);
const obj = req.slice(length + 1);
const tag = msg.slice(1, 3);
const req = msg.toString().slice(3);
if (opcode == Tqueue) {
const id = msg.readUInt32LE(3);
const obj = msg.slice(7);
const job = { msg: obj, num: jobAmount };
const uuid = uuidv4();
jobs.set(uuid, job);
queue.push(uuid);
jobs.set(id, job);
queue.push(id);
const newBuffer = Buffer.concat([Buffer.from([0x00]), Buffer.from(uuid), Buffer.from(num)]);
const newBuffer = Buffer.concat([Buffer.from([Rqueue]), tag]);
ws.send(newBuffer);
if (jobAmount < MAX_JOBS) {
log(`Got WS request for job ${job.msg} with id ${uuid}`, job.num);
acceptJob(uuid, ws);
log(`Got WS request for job ${job.msg} with id ${id}`, job.num);
acceptJob(id, ws);
} else {
log(`Got WS request for job ${job.msg} with id ${uuid}, queued in position ${queue.indexOf(uuid)}`, job.num);
log(`Got WS request for job ${job.msg} with id ${id}, queued in position ${queue.indexOf(id)}`, job.num);
}
} else if (opcode == Tcancel) {
delete queue[queue.indexOf(req) - 1];
jobs.delete(req);
const cancelResponse = Buffer.concat([Buffer.from([Rcancel]), tag]);
ws.send(cancelResponse);
} else if (opcode == Twait) {
const id = msg.readUInt32LE(3);
const job = jobs.get(id);
if (!job) {
const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from("Invalid job ID")]);
ws.send(errorResponse);
return;
}
if (job.error) {
jobs.delete(id);
const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from(job.error)]);
ws.send(errorResponse);
return;
}
job.tag = tag;
jobs.set(id, job);
//const waitResponse = Buffer.concat([Buffer.from([Rwait]), tag]);
//ws.send(waitResponse);
} else {
log("Could not parse WS message");
}
@ -117,41 +168,36 @@ httpServer.on("request", async (req, res) => {
return res.end("401 Unauthorized");
}
const reqUrl = new URL(req.url, `http://${req.headers.host}`);
if (reqUrl.pathname === "/status" && req.method === "GET") {
log(`Sending server status to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`);
const statusObject = {
load: MAX_JOBS - jobAmount,
queued: queue.length
};
return res.end(JSON.stringify(statusObject));
} else if (reqUrl.pathname === "/running" && req.method === "GET") {
log(`Sending currently running jobs to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`);
const keys = jobs.keys();
const newObject = { queued: queue.length, runningJobs: jobAmount, max: MAX_JOBS };
for (const key of keys) {
const validKeys = Object.keys(jobs.get(key)).filter((value) => value !== "addr" && value !== "port" && value !== "data" && value !== "ext");
newObject[key] = {};
for (const validKey of validKeys) {
if (validKey === "msg") {
newObject[key][validKey] = JSON.parse(jobs.get(key)[validKey]);
} else {
newObject[key][validKey] = jobs.get(key)[validKey];
}
}
}
return res.end(JSON.stringify(newObject));
} else if (reqUrl.pathname === "/image" && req.method === "GET") {
if (reqUrl.pathname === "/image" && req.method === "GET") {
if (!reqUrl.searchParams.has("id")) {
res.statusCode = 400;
return res.end("400 Bad Request");
}
const id = reqUrl.searchParams.get("id");
const id = parseInt(reqUrl.searchParams.get("id"));
if (!jobs.has(id)) {
res.statusCode = 410;
return res.end("410 Gone");
}
log(`Sending image data for job ${id} to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`);
res.setHeader("ext", jobs.get(id).ext);
const ext = jobs.get(id).ext;
let contentType;
switch (ext) {
case "gif":
contentType = "image/gif";
break;
case "png":
contentType = "image/png";
break;
case "jpeg":
case "jpg":
contentType = "image/jpeg";
break;
case "webp":
contentType = "image/webp";
break;
}
if (contentType) res.setHeader("Content-Type", contentType);
else res.setHeader("Content-Type", ext);
const data = jobs.get(id).data;
jobs.delete(id);
return res.end(data, (err) => {
@ -185,17 +231,17 @@ httpServer.on("error", (e) => {
console.error("An HTTP error occurred: ", e);
});
httpServer.listen(8080, () => {
log("HTTP and WS listening on port 8080");
httpServer.listen(3762, () => {
log("HTTP and WS listening on port 3762");
});
const runJob = (job, sock) => {
const runJob = (job, ws) => {
return new Promise((resolve, reject) => {
log(`Job ${job.uuid} starting...`, job.num);
log(`Job ${job.id} starting...`, job.num);
const object = JSON.parse(job.msg);
// If the image has a path, it must also have a type
if (object.path && !object.type) {
if (object.path && !object.params.type) {
reject(new TypeError("Unknown image type"));
}
@ -206,34 +252,21 @@ const runJob = (job, sock) => {
worker.terminate();
reject(new Error("Job timed out"));
}, 900000);
log(`Job ${job.uuid} started`, job.num);
log(`Job ${job.id} started`, job.num);
worker.once("message", (data) => {
clearTimeout(timeout);
log(`Sending result of job ${job.uuid} back to the bot`, job.num);
const jobObject = jobs.get(job.uuid);
log(`Sending result of job ${job.id} back to the bot`, job.num);
const jobObject = jobs.get(job.id);
jobObject.data = data.buffer;
jobObject.ext = data.fileExtension;
jobs.set(job.uuid, jobObject);
sock.send(Buffer.concat([Buffer.from([0x1]), Buffer.from(job.uuid)]), () => {
return resolve();
});
jobs.set(job.id, jobObject);
const waitResponse = Buffer.concat([Buffer.from([Rwait]), jobObject.tag]);
ws.send(waitResponse);
resolve();
});
worker.once("error", (e) => {
clearTimeout(timeout);
reject(e);
});
/*run(object).then((data) => {
log(`Sending result of job ${job.uuid} back to the bot`, job.num);
const jobObject = jobs.get(job.uuid);
jobObject.data = data.buffer;
jobObject.ext = data.fileExtension;
jobs.set(job.uuid, jobObject);
sock.write(Buffer.concat([Buffer.from([0x1]), Buffer.from(job.uuid)]), (e) => {
if (e) return reject(e);
return resolve();
});
}).catch(e => {
reject(e);
});*/
});
};

View File

@ -47,7 +47,8 @@ class ImageCommand extends Command {
runningCommands.set(this.message.author.id, this.message.createdAt);
const magickParams = {
cmd: this.constructor.command
cmd: this.constructor.command,
params: {}
};
if (this.constructor.requiresImage) {
@ -64,9 +65,9 @@ class ImageCommand extends Command {
return "I've been rate-limited by Tenor. Please try uploading your GIF elsewhere.";
}
magickParams.path = image.path;
magickParams.type = image.type;
magickParams.params.type = image.type;
magickParams.url = image.url; // technically not required but can be useful for text filtering
magickParams.delay = image.delay ? image.delay : 0;
magickParams.params.delay = image.delay ? image.delay : 0;
if (this.constructor.requiresGIF) magickParams.onlyGIF = true;
} catch (e) {
runningCommands.delete(this.message.author.id);
@ -84,15 +85,15 @@ class ImageCommand extends Command {
switch (typeof this.params) {
case "function":
Object.assign(magickParams, this.params(magickParams.url, magickParams.delay));
Object.assign(magickParams.params, this.params(magickParams.url, magickParams.delay));
break;
case "object":
Object.assign(magickParams, this.params);
Object.assign(magickParams.params, this.params);
break;
}
let status;
if (magickParams.type === "image/gif") {
if (magickParams.params.type === "image/gif") {
status = await this.processMessage(this.message);
} else {
this.client.sendChannelTyping(this.message.channel.id);

View File

@ -18,7 +18,7 @@ class ImageStatsCommand extends Command {
for (let i = 0; i < servers.length; i++) {
embed.embeds[0].fields.push({
name: `Server ${i + 1}`,
value: `Running Jobs: ${servers[i].runningJobs}\nQueued: ${servers[i].queued}\nMax Jobs: ${servers[i].max}`
value: `Running Jobs: ${servers[i].runningJobs}\nQueued: ${Math.max(0, servers[i].runningJobs - servers[i].max)}\nMax Jobs: ${servers[i].max}`
});
}
return embed;

View File

@ -1,5 +1,5 @@
import ImageCommand from "../../classes/imageCommand.js";
const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto"];
const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto", "times"];
class CaptionCommand extends ImageCommand {
params(url) {

View File

@ -1,21 +1,27 @@
import ImageCommand from "../../classes/imageCommand.js";
const words = ["me irl", "dank", "follow my second account @esmBot_", "2016", "meme", "wholesome", "reddit", "instagram", "twitter", "facebook", "fortnite", "minecraft", "relatable", "gold", "funny", "template", "hilarious", "memes", "deep fried", "2020", "leafy", "pewdiepie"];
const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto", "times"];
class CaptionTwoCommand extends ImageCommand {
params(url) {
const newArgs = this.args.filter(item => !item.includes(url));
return {
caption: newArgs.length !== 0 ? newArgs.join(" ").replaceAll("&", "\\&amp;").replaceAll(">", "\\&gt;").replaceAll("<", "\\&lt;").replaceAll("\"", "\\&quot;").replaceAll("'", "\\&apos;").replaceAll("%", "\\%") : words.sort(() => 0.5 - Math.random()).slice(0, Math.floor(Math.random() * words.length + 1)).join(" "),
top: !!this.specialArgs.top
top: !!this.specialArgs.top,
font: this.specialArgs.font && allowedFonts.includes(this.specialArgs.font.toLowerCase()) ? this.specialArgs.font.toLowerCase() : "helvetica"
};
}
static description = "Adds a me.me caption/tag list to an image/GIF";
static description = "Adds a me.me caption/tag list to an image";
static aliases = ["tags2", "meirl", "memecaption", "medotmecaption"];
static arguments = ["{text}"];
static flags = [{
name: "top",
description: "Put the caption on the top of an image/GIF instead of the bottom"
description: "Put the caption on the top of an image instead of the bottom"
}, {
name: "font",
type: allowedFonts.join("|"),
description: "Specify the font you want to use (default: `helvetica`)"
}];
static noText = "You need to provide some text to add a caption!";

View File

@ -1,4 +1,5 @@
import ImageCommand from "../../classes/imageCommand.js";
const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto", "times"];
class MemeCommand extends ImageCommand {
params(url) {
@ -6,7 +7,8 @@ class MemeCommand extends ImageCommand {
const [topText, bottomText] = newArgs.join(" ").split(/(?<!\\),/).map(elem => elem.trim());
return {
top: (this.specialArgs.case ? topText : topText.toUpperCase()).replaceAll("&", "\\&amp;").replaceAll(">", "\\&gt;").replaceAll("<", "\\&lt;").replaceAll("\"", "\\&quot;").replaceAll("'", "\\&apos;").replaceAll("%", "\\%"),
bottom: bottomText ? (this.specialArgs.case ? bottomText : bottomText.toUpperCase()).replaceAll("&", "\\&amp;").replaceAll(">", "\\&gt;").replaceAll("<", "\\&lt;").replaceAll("\"", "\\&quot;").replaceAll("'", "\\&apos;").replaceAll("%", "\\%") : ""
bottom: bottomText ? (this.specialArgs.case ? bottomText : bottomText.toUpperCase()).replaceAll("&", "\\&amp;").replaceAll(">", "\\&gt;").replaceAll("<", "\\&lt;").replaceAll("\"", "\\&quot;").replaceAll("'", "\\&apos;").replaceAll("%", "\\%") : "",
font: this.specialArgs.font && allowedFonts.includes(this.specialArgs.font.toLowerCase()) ? this.specialArgs.font.toLowerCase() : "impact"
};
}
@ -15,6 +17,10 @@ class MemeCommand extends ImageCommand {
static flags = [{
name: "case",
description: "Make the meme text case-sensitive (allows for lowercase text)"
}, {
name: "font",
type: allowedFonts.join("|"),
description: "Specify the font you want to use (default: `impact`)"
}];
static requiresText = true;

View File

@ -1,4 +1,5 @@
import ImageCommand from "../../classes/imageCommand.js";
const allowedFonts = ["futura", "impact", "helvetica", "arial", "roboto", "noto", "times"];
class MotivateCommand extends ImageCommand {
params(url) {
@ -6,13 +7,19 @@ class MotivateCommand extends ImageCommand {
const [topText, bottomText] = newArgs.join(" ").split(/(?<!\\),/).map(elem => elem.trim());
return {
top: topText.replaceAll("&", "\\&amp;").replaceAll(">", "\\&gt;").replaceAll("<", "\\&lt;").replaceAll("\"", "\\&quot;").replaceAll("'", "\\&apos;").replaceAll("%", "\\%"),
bottom: bottomText ? bottomText.replaceAll("&", "\\&amp;").replaceAll(">", "\\&gt;").replaceAll("<", "\\&lt;").replaceAll("\"", "\\&quot;").replaceAll("'", "\\&apos;").replaceAll("%", "\\%") : ""
bottom: bottomText ? bottomText.replaceAll("&", "\\&amp;").replaceAll(">", "\\&gt;").replaceAll("<", "\\&lt;").replaceAll("\"", "\\&quot;").replaceAll("'", "\\&apos;").replaceAll("%", "\\%") : "",
font: this.specialArgs.font && allowedFonts.includes(this.specialArgs.font.toLowerCase()) ? this.specialArgs.font.toLowerCase() : "times"
};
}
static description = "Generates a motivational poster";
static aliases = ["motivational", "motiv", "demotiv", "demotivational", "poster", "motivation", "demotivate"];
static arguments = ["[top text]", "{bottom text}"];
static flags = [{
name: "font",
type: allowedFonts.join("|"),
description: "Specify the font you want to use (default: `times`)"
}];
static requiresText = true;
static noText = "You need to provide some text to generate a motivational poster!";

View File

@ -15,6 +15,7 @@ Napi::Value CaptionTwo(const Napi::CallbackInfo &info) {
Napi::Buffer<char> data = obj.Get("data").As<Napi::Buffer<char>>();
string caption = obj.Get("caption").As<Napi::String>().Utf8Value();
bool top = obj.Get("top").As<Napi::Boolean>().Value();
string font = obj.Get("font").As<Napi::String>().Utf8Value();
string type = obj.Get("type").As<Napi::String>().Utf8Value();
int delay =
obj.Has("delay") ? obj.Get("delay").As<Napi::Number>().Int32Value() : 0;
@ -39,7 +40,10 @@ Napi::Value CaptionTwo(const Napi::CallbackInfo &info) {
caption_image.fillColor("black");
caption_image.font("Helvetica Neue");
caption_image.fontPointsize(width / 17);
caption_image.read("pango:" + caption);
caption_image.read("pango:<span font_family=\"" +
(font == "roboto" ? "Roboto Condensed" : font) +
"\" weight=\"" + (font != "impact" ? "bold" : "normal") +
"\">" + caption + "</span>");
caption_image.extent(Geometry(width, caption_image.rows() + (width / 25)),
Magick::CenterGravity);

View File

@ -15,6 +15,7 @@ Napi::Value Meme(const Napi::CallbackInfo &info) {
Napi::Buffer<char> data = obj.Get("data").As<Napi::Buffer<char>>();
string top = obj.Get("top").As<Napi::String>().Utf8Value();
string bottom = obj.Get("bottom").As<Napi::String>().Utf8Value();
string font = obj.Get("font").As<Napi::String>().Utf8Value();
string type = obj.Get("type").As<Napi::String>().Utf8Value();
int delay =
obj.Has("delay") ? obj.Get("delay").As<Napi::Number>().Int32Value() : 0;
@ -43,7 +44,10 @@ Napi::Value Meme(const Napi::CallbackInfo &info) {
top_text.font("Impact");
top_text.fontPointsize(width / 12);
top_text.textGravity(Magick::CenterGravity);
top_text.read("pango:<span foreground='white'>" + top + "</span>");
top_text.read("pango:<span font_family=\"" +
(font == "roboto" ? "Roboto Condensed" : font) +
"\" weight=\"" + (font != "impact" ? "bold" : "normal") +
"\" foreground='white'>" + top + "</span>");
Image top_text_fill = top_text;
top_text_fill.channel(Magick::AlphaChannel);
top_text_fill.morphology(Magick::EdgeOutMorphology, "Octagon");

View File

@ -15,6 +15,7 @@ Napi::Value Motivate(const Napi::CallbackInfo &info) {
Napi::Buffer<char> data = obj.Get("data").As<Napi::Buffer<char>>();
string top_text = obj.Get("top").As<Napi::String>().Utf8Value();
string bottom_text = obj.Get("bottom").As<Napi::String>().Utf8Value();
string font = obj.Get("font").As<Napi::String>().Utf8Value();
string type = obj.Get("type").As<Napi::String>().Utf8Value();
int delay =
obj.Has("delay") ? obj.Get("delay").As<Napi::Number>().Int32Value() : 0;
@ -40,7 +41,10 @@ Napi::Value Motivate(const Napi::CallbackInfo &info) {
top.font("Times");
top.textGravity(Magick::CenterGravity);
top.fontPointsize(56);
top.read("pango:<span foreground='white'>" + top_text + "</span>");
top.read("pango:<span font_family=\"" +
(font == "roboto" ? "Roboto Condensed" : font) +
"\" weight=\"" + (font != "impact" ? "bold" : "normal") +
"\" foreground='white'>" + top_text + "</span>");
top.extent(Geometry(bottom_text != "" ? to_string(top.columns()) + "x" +
to_string(top.rows())
: to_string(top.columns()) + "x" +

View File

@ -3,7 +3,7 @@
{ "id": "1", "host": "localhost", "port": 2333, "password": "youshallnotpass", "local": true }
],
"image": [
{ "server": "localhost", "auth": "verycoolpass100" }
{ "server": "localhost", "auth": "verycoolpass100", "tls": false }
],
"searx": [
"https://searx.xyz"

View File

@ -11,7 +11,7 @@ function run(object) {
// If the image has a path, it must also have a type
let promise = new Promise((resolveTest) => { resolveTest(); }); // no-op
if (object.path) {
if (object.type !== "image/gif" && object.onlyGIF) resolve({
if (object.params.type !== "image/gif" && object.onlyGIF) resolve({
buffer: Buffer.alloc(0),
fileExtension: "nogif"
});
@ -20,10 +20,10 @@ function run(object) {
// Convert from a MIME type (e.g. "image/png") to something ImageMagick understands (e.g. "png").
// Don't set `type` directly on the object we are passed as it will be read afterwards.
// If no image type is given (say, the command generates its own image), make it a PNG.
const fileExtension = object.type ? object.type.split("/")[1] : "png";
const fileExtension = object.params.type ? object.params.type.split("/")[1] : "png";
promise.then(buf => {
object.data = buf;
const objectWithFixedType = Object.assign({}, object, {type: fileExtension});
object.params.data = buf;
const objectWithFixedType = Object.assign({}, object.params, {type: fileExtension});
try {
const result = magick[object.cmd](objectWithFixedType);
const returnObject = {

View File

@ -2,7 +2,7 @@ import fetch from "node-fetch";
import fs from "fs";
import fileType from "file-type";
const formats = ["image/jpeg", "image/png", "image/webp", "image/gif", "video/mp4", "video/webm", "video/mov"];
const formats = ["image/jpeg", "image/png", "image/webp", "image/gif", "video/mp4", "video/webm", "video/quicktime"];
export const jobs = {};

167
utils/imageConnection.js Normal file
View File

@ -0,0 +1,167 @@
import fetch from "node-fetch";
import WebSocket from "ws";
import * as logger from "./logger.js";
import { setTimeout } from "timers/promises";
/*
Rerror 0x01
Tqueue 0x02
Rqueue 0x03
Tcancel 0x04
Rcancel 0x05
Twait 0x06
Rwait 0x07
Rinit 0x08
*/
const Rerror = 0x01;
const Tqueue = 0x02;
const Rqueue = 0x03;
const Tcancel = 0x04;
const Rcancel = 0x05;
const Twait = 0x06;
const Rwait = 0x07;
const Rinit = 0x08;
class ImageConnection {
constructor(host, auth, tls = false) {
this.requests = new Map();
this.host = host;
this.port = 3762;
this.auth = auth;
this.tag = null;
this.disconnected = false;
this.njobs = 0;
this.max = 0;
this.formats = {};
this.wsproto = null;
if (tls) {
this.wsproto = "wss";
} else {
this.wsproto = "ws";
}
this.sockurl = `${this.wsproto}://${host}:${this.port}/sock`;
this.conn = new WebSocket(this.sockurl, {
headers: {
"Authentication": auth && auth !== "" ? auth : undefined
}
});
let httpproto;
if (tls) {
httpproto = "https";
} else {
httpproto = "http";
}
this.httpurl = `${httpproto}://${host}:${this.port}/image`;
this.conn.on("message", (msg) => this.onMessage(msg));
this.conn.once("error", (err) => this.onError(err));
this.conn.once("close", () => this.onClose());
}
onMessage(msg) {
const op = msg.readUint8(0);
if (op === Rinit) {
this.max = msg.readUint16LE(3);
this.formats = JSON.parse(msg.toString("utf8", 5));
return;
}
const tag = msg.readUint16LE(1);
const promise = this.requests.get(tag);
this.requests.delete(tag);
if (op === Rqueue) this.njobs++;
if (op === Rcancel || op === Rwait) this.njobs--;
if (op === Rerror) {
this.njobs--;
promise.reject(new Error(msg.slice(3, msg.length).toString()));
return;
}
promise.resolve();
}
onError(e) {
logger.error(e.toString());
}
async onClose() {
for (const promise of this.requests.values()) {
promise.reject(new Error("Request ended prematurely due to a closed connection"));
}
this.requests.clear();
if (!this.disconnected) {
logger.warn(`Lost connection to ${this.host}:${this.port}, attempting to reconnect in 5 seconds...`);
await setTimeout(5000);
this.conn = new WebSocket(this.sockurl, {
headers: {
"Authentication": this.auth
}
});
this.conn.on("message", (msg) => this.onMessage(msg));
this.conn.once("error", (err) => this.onError(err));
this.conn.once("close", () => this.onClose());
}
this.disconnected = false;
}
close() {
this.disconnected = true;
this.conn.close();
}
queue(jobid, jobobj) {
const str = JSON.stringify(jobobj);
const buf = Buffer.alloc(4);
buf.writeUint32LE(jobid);
return this.do(Tqueue, Buffer.concat([buf, Buffer.from(str)]));
}
wait(jobid) {
const buf = Buffer.alloc(4);
buf.writeUint32LE(jobid);
return this.do(Twait, buf);
}
cancel(jobid) {
const buf = Buffer.alloc(4);
buf.writeUint32LE(jobid);
return this.do(Tcancel, buf);
}
async getOutput(jobid) {
const req = await fetch(`${this.httpurl}?id=${jobid}`, {
headers: {
"Authentication": this.auth && this.auth !== "" ? this.auth : undefined
}
});
const contentType = req.headers.get("Content-Type");
let type;
switch (contentType) {
case "image/gif":
type = "gif";
break;
case "image/png":
type = "png";
break;
case "image/jpeg":
type = "jpg";
break;
case "image/webp":
type = "webp";
break;
}
return { buffer: Buffer.from(await req.arrayBuffer()), type };
}
async do(op, data) {
const buf = Buffer.alloc(1 + 2);
let tag = this.tag++;
if (tag > 65535) tag = this.tag = 0;
buf.writeUint8(op);
buf.writeUint16LE(tag, 1);
this.conn.send(Buffer.concat([buf, data]));
const promise = new Promise((resolve, reject) => {
this.requests.set(tag, { resolve, reject });
});
return promise;
}
}
export default ImageConnection;

View File

@ -1,12 +1,11 @@
import { BaseServiceWorker } from "eris-fleet";
import * as logger from "../logger.js";
import fetch from "node-fetch";
import WebSocket from "ws";
import fs from "fs";
import path from "path";
import { fileURLToPath } from "url";
import { Worker } from "worker_threads";
import { EventEmitter } from "events";
import ImageConnection from "../imageConnection.js";
class ImageWorker extends BaseServiceWorker {
constructor(setup) {
@ -16,6 +15,7 @@ class ImageWorker extends BaseServiceWorker {
this.jobs = {};
this.connections = new Map();
this.servers = JSON.parse(fs.readFileSync("./servers.json", { encoding: "utf8" })).image;
this.nextID = 0;
}
this.begin().then(() => this.serviceReady());
@ -41,46 +41,18 @@ class ImageWorker extends BaseServiceWorker {
}
async getRunning() {
let serversLeft = this.connections.size;
const statuses = [];
for (const address of this.connections.keys()) {
const connection = this.connections.get(address);
if (connection.readyState !== 0 && connection.readyState !== 1) {
serversLeft--;
for (const [address, connection] of this.connections) {
if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) {
continue;
}
const controller = new AbortController(); // eslint-disable-line no-undef
const timeout = setTimeout(() => {
controller.abort();
}, 2000);
try {
const auth = this.servers.filter((val) => val.server === address)[0].auth;
const statusRequest = await fetch(`http://${address}:8080/running`, {
signal: controller.signal,
headers: {
"Authentication": auth && auth !== "" ? auth : undefined
}
});
clearTimeout(timeout);
const status = await statusRequest.json();
serversLeft--;
statuses.push(status);
} catch (e) {
if (e.name === "AbortError") {
serversLeft--;
continue;
} else if (e.code === "ECONNREFUSED") {
serversLeft--;
continue;
}
throw e;
}
}
if (!serversLeft) {
return statuses;
} else {
throw new Error("Loop ended before all servers could be checked");
statuses.push({
address,
runningJobs: connection.njobs,
max: connection.max
});
}
return statuses;
}
async chooseServer(ideal) {
@ -89,119 +61,28 @@ class ImageWorker extends BaseServiceWorker {
return b.load - a.load;
}).filter((e, i, array) => {
return !(e.load < array[0].load);
}).sort((a, b) => {
return a.queued - b.queued;
});
return sorted[0];
}
async getIdeal() {
let serversLeft = this.connections.size;
if (serversLeft < this.servers.length) {
for (const server of this.servers) {
try {
if (!this.connections.has(server.server)) await this.connect(server.server, server.auth);
} catch (e) {
logger.error(e);
}
}
serversLeft = this.connections.size;
}
async getIdeal(object) {
const idealServers = [];
for (const address of this.connections.keys()) {
const connection = this.connections.get(address);
if (connection.readyState !== 0 && connection.readyState !== 1) {
serversLeft--;
for (const [address, connection] of this.connections) {
if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) {
continue;
}
const controller = new AbortController(); // eslint-disable-line no-undef
const timeout = setTimeout(() => {
controller.abort();
}, 5000);
try {
const auth = this.servers.filter((val) => val.server === address)[0].auth;
const statusRequest = await fetch(`http://${address}:8080/status`, {
signal: controller.signal,
headers: {
"Authentication": auth && auth !== "" ? auth : undefined
}
});
clearTimeout(timeout);
const status = await statusRequest.json();
serversLeft--;
idealServers.push({
addr: address,
load: status.load,
queued: status.queued
});
} catch (e) {
if (e.name === "AbortError") {
serversLeft--;
continue;
} else if (e.code === "ECONNREFUSED") {
serversLeft--;
continue;
}
throw e;
} finally {
clearTimeout(timeout);
}
}
if (!serversLeft) {
const server = await this.chooseServer(idealServers);
return { addr: server.addr, sock: this.connections.get(server.addr) };
} else {
throw new Error("Loop ended before all servers could be checked");
if (object.params.type && !connection.formats[object.cmd].includes(object.params.type)) continue;
idealServers.push({
addr: address,
load: connection.njobs / connection.max
});
}
const server = await this.chooseServer(idealServers);
return this.connections.get(server.addr);
}
async connect(server, auth) {
const connection = new WebSocket(`ws://${server}:8080/sock`, {
headers: {
"Authentication": auth && auth !== "" ? auth : undefined
}
});
connection.on("message", async (msg) => {
const opcode = msg.readUint8(0);
const req = msg.slice(37, msg.length);
const uuid = msg.slice(1, 37).toString();
if (opcode === 0x00) { // Job queued
if (this.jobs[req]) {
this.jobs[req].event.emit("uuid", uuid);
}
} else if (opcode === 0x01) { // Job completed successfully
// the image API sends all job responses over the same socket; make sure this is ours
if (this.jobs[uuid]) {
const imageReq = await fetch(`http://${server}:8080/image?id=${uuid}`, {
headers: {
"Authentication": auth && auth !== "" ? auth : undefined
}
});
const image = Buffer.from(await imageReq.arrayBuffer());
// The response data is given as the file extension/ImageMagick type of the image (e.g. "png"), followed
// by a newline, followed by the image data.
this.jobs[uuid].event.emit("image", image, imageReq.headers.get("ext"));
}
} else if (opcode === 0x02) { // Job errored
if (this.jobs[uuid]) {
this.jobs[uuid].event.emit("error", new Error(req));
}
}
});
connection.on("error", (e) => {
logger.error(e.toString());
});
connection.once("close", () => {
for (const uuid of Object.keys(this.jobs)) {
if (this.jobs[uuid].addr === server) {
this.jobs[uuid].event.emit("error", "Job ended prematurely due to a closed connection; please run your image job again");
delete this.jobs[uuid];
}
}
//logger.log(`Lost connection to ${server}, attempting to reconnect...`);
this.connections.delete(server);
});
const connection = new ImageConnection(server, auth);
this.connections.set(server, connection);
}
@ -209,70 +90,38 @@ class ImageWorker extends BaseServiceWorker {
for (const connection of this.connections.values()) {
connection.close();
}
for (const uuid of Object.keys(this.jobs)) {
this.jobs[uuid].event.emit("error", "Job ended prematurely (not really an error; just run your image job again)");
delete this.jobs[uuid];
}
this.connections.clear();
return;
}
async start(object, num) {
const currentServer = await this.getIdeal();
const data = Buffer.concat([Buffer.from([0x01 /* queue job */]), Buffer.from(num.length.toString()), Buffer.from(num), Buffer.from(JSON.stringify(object))]);
currentServer.sock.send(data);
const event = new EventEmitter();
this.jobs[num] = { event, addr: currentServer.addr };
const uuid = await new Promise((resolve, reject) => {
event.once("uuid", (uuid) => resolve(uuid));
event.once("error", reject);
waitForWorker(worker) {
return new Promise((resolve, reject) => {
worker.once("message", (data) => {
resolve({
buffer: Buffer.from([...data.buffer]),
type: data.fileExtension
});
});
worker.once("error", reject);
});
delete this.jobs[num];
this.jobs[uuid] = { event: event, addr: currentServer.addr };
return { uuid: uuid, event: event };
}
run(object) {
return new Promise((resolve, reject) => {
if (process.env.API === "true") {
// Connect to best image server
const num = Math.floor(Math.random() * 100000).toString().slice(0, 5);
const timeout = setTimeout(() => {
if (this.jobs[num]) delete this.jobs[num];
reject("The image request timed out after 25 seconds. Try uploading your image elsewhere.");
}, 25000);
this.start(object, num).then((data) => {
clearTimeout(timeout);
if (!data.event) reject("Not connected to image server");
data.event.once("image", (image, type) => {
delete this.jobs[data.uuid];
const payload = {
// Take just the image data
buffer: image,
type: type
};
resolve(payload);
});
data.event.once("error", (err) => {
delete this.jobs[data.uuid];
reject(err);
});
return;
}).catch(err => reject(err));
} else {
// Called from command (not using image API)
const worker = new Worker(path.join(path.dirname(fileURLToPath(import.meta.url)), "../image-runner.js"), {
workerData: object
});
worker.once("message", (data) => {
resolve({
buffer: Buffer.from([...data.buffer]),
type: data.fileExtension
});
});
worker.once("error", reject);
}
});
async run(object) {
if (process.env.API === "true") {
let num = this.nextID++;
if (num > 4294967295) num = this.nextID = 0;
const currentServer = await this.getIdeal(object);
await currentServer.queue(num, object);
await currentServer.wait(num);
const output = await currentServer.getOutput(num);
return output;
} else {
// Called from command (not using image API)
const worker = new Worker(path.join(path.dirname(fileURLToPath(import.meta.url)), "../image-runner.js"), {
workerData: object
});
return await this.waitForWorker(worker);
}
}
async handleCommand(data) {