initial update

This commit is contained in:
murm 2023-03-15 10:09:09 -04:00
parent 3272429cf6
commit db9b70bf66
280 changed files with 11772 additions and 11966 deletions

View file

@ -1,51 +1,51 @@
# esmBot Image API
The esmBot image API is a combined HTTP and WebSocket API. The default port to access the API is 3762. The API supports very basic authentication, which is defined on the server via the PASS environment variable and is sent from the client via the Authentication header in both HTTP and WS requests.
## HTTP
### GET `/image/?id=<job id>`
Get image data after job is finished running. The Content-Type header is properly set.
### GET `/count`
Get the current amount of running jobs. Response is a plaintext number value.
## WebSockets
A client sends *requests* (T-messages) to a server, which subsequently *replies* (R-messages) to the client.
### Message IDs
- Rerror 0x01
- Tqueue 0x02
- Rqueue 0x03
- Tcancel 0x04
- Rcancel 0x05
- Twait 0x06
- Rwait 0x07
- Rinit 0x08
### Messages
[n] means n bytes.
[s] means a string that goes until the end of the message.
[j] means JSON data that goes until the end of the message.
`tag` is used to identify a request/response pair, like `lock` in the original API. `jid` is used to identify a job. `job` is a job object.
- Rerror tag[2] error[s]
- Tqueue tag[2] jid[8] job[j]
- Rqueue tag[2]
- Tcancel tag[2] jid[8]
- Rcancel tag[2]
- Twait tag[2] jid[8]
- Rwait tag[2]
- Rinit tag[2] max_jobs[2] running_jobs[2] formats[j]
### Job Object
The job object is formatted like this:
```js
{
"cmd": string, // name of internal image command, e.g. caption
"path": string, // canonical image URL, used for getting the actual image
"url": string, // original image URL, used for message filtering
"params": { // content varies depending on the command, some common parameters are listed here
"type": string, // mime type of output, should usually be the same as input
...
},
"name": string // filename of the image, without extension
}
```
# esmBot Image API
The esmBot image API is a combined HTTP and WebSocket API. The default port to access the API is 3762. The API supports very basic authentication, which is defined on the server via the PASS environment variable and is sent from the client via the Authentication header in both HTTP and WS requests.
## HTTP
### GET `/image/?id=<job id>`
Get image data after job is finished running. The Content-Type header is properly set.
### GET `/count`
Get the current amount of running jobs. Response is a plaintext number value.
## WebSockets
A client sends *requests* (T-messages) to a server, which subsequently *replies* (R-messages) to the client.
### Message IDs
- Rerror 0x01
- Tqueue 0x02
- Rqueue 0x03
- Tcancel 0x04
- Rcancel 0x05
- Twait 0x06
- Rwait 0x07
- Rinit 0x08
### Messages
[n] means n bytes.
[s] means a string that goes until the end of the message.
[j] means JSON data that goes until the end of the message.
`tag` is used to identify a request/response pair, like `lock` in the original API. `jid` is used to identify a job. `job` is a job object.
- Rerror tag[2] error[s]
- Tqueue tag[2] jid[8] job[j]
- Rqueue tag[2]
- Tcancel tag[2] jid[8]
- Rcancel tag[2]
- Twait tag[2] jid[8]
- Rwait tag[2]
- Rinit tag[2] max_jobs[2] running_jobs[2] formats[j]
### Job Object
The job object is formatted like this:
```js
{
"cmd": string, // name of internal image command, e.g. caption
"path": string, // canonical image URL, used for getting the actual image
"url": string, // original image URL, used for message filtering
"params": { // content varies depending on the command, some common parameters are listed here
"type": string, // mime type of output, should usually be the same as input
...
},
"name": string // filename of the image, without extension
}
```

View file

@ -1,290 +1,290 @@
import { config } from "dotenv";
config();
import { cpus } from "os";
import { Worker } from "worker_threads";
import { join } from "path";
import { createServer } from "http";
import { WebSocketServer } from "ws";
import { fileURLToPath } from "url";
import { dirname } from "path";
import { createRequire } from "module";
import EventEmitter from "events";
const nodeRequire = createRequire(import.meta.url);
const img = nodeRequire(`../build/${process.env.DEBUG && process.env.DEBUG === "true" ? "Debug" : "Release"}/image.node`);
const Rerror = 0x01;
const Tqueue = 0x02;
const Rqueue = 0x03;
const Tcancel = 0x04;
const Rcancel = 0x05;
const Twait = 0x06;
const Rwait = 0x07;
const Rinit = 0x08;
const start = process.hrtime();
const log = (msg, jobNum) => {
console.log(`[${process.hrtime(start)[1] / 1000000}${jobNum ? `:${jobNum}` : ""}]\t ${msg}`);
};
const error = (msg, jobNum) => {
console.error(`[${process.hrtime(start)[1] / 1000000}${jobNum ? `:${jobNum}` : ""}]\t ${msg}`);
};
class JobCache extends Map {
set(key, value) {
super.set(key, value);
setTimeout(() => {
if (super.has(key) && this.get(key) === value && value.data) super.delete(key);
}, 300000); // delete jobs if not requested after 5 minutes
}
}
const jobs = new JobCache();
// Should look like ID : { msg: "request", num: <job number> }
const queue = [];
// Array of IDs
const MAX_JOBS = process.env.JOBS ? parseInt(process.env.JOBS) : cpus().length * 4; // Completely arbitrary, should usually be some multiple of your amount of cores
const PASS = process.env.PASS ? process.env.PASS : undefined;
let jobAmount = 0;
const acceptJob = (id, sock) => {
jobAmount++;
queue.shift();
const job = jobs.get(id);
return runJob({
id: id,
msg: job.msg,
num: job.num
}, sock).then(() => {
log(`Job ${id} has finished`);
}).catch((err) => {
error(`Error on job ${id}: ${err}`, job.num);
const newJob = jobs.get(id);
if (!newJob.tag) {
newJob.error = err.message;
jobs.set(id, newJob);
return;
}
jobs.delete(id);
sock.send(Buffer.concat([Buffer.from([Rerror]), newJob.tag, Buffer.from(err.message)]));
}).finally(() => {
jobAmount--;
if (queue.length > 0) {
acceptJob(queue[0], sock);
}
});
};
const waitForVerify = (event) => {
return new Promise((resolve, reject) => {
event.once("end", (r) => resolve(r));
event.once("error", (e) => reject(e));
});
};
const wss = new WebSocketServer({ clientTracking: true, noServer: true });
wss.on("connection", (ws, request) => {
log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has connected`);
const num = Buffer.alloc(2);
num.writeUInt16LE(MAX_JOBS);
const cur = Buffer.alloc(2);
cur.writeUInt16LE(jobAmount);
const formats = {};
for (const cmd of img.funcs) {
formats[cmd] = ["image/png", "image/gif", "image/jpeg", "image/webp"];
}
const init = Buffer.concat([Buffer.from([Rinit]), Buffer.from([0x00, 0x00]), num, cur, Buffer.from(JSON.stringify(formats))]);
ws.send(init);
ws.on("error", (err) => {
error(err);
});
ws.on("message", (msg) => {
const opcode = msg.readUint8(0);
const tag = msg.slice(1, 3);
const req = msg.toString().slice(3);
if (opcode == Tqueue) {
const id = msg.readBigInt64LE(3);
const obj = msg.slice(11);
const job = { msg: obj, num: jobAmount, verifyEvent: new EventEmitter() };
jobs.set(id, job);
queue.push(id);
const newBuffer = Buffer.concat([Buffer.from([Rqueue]), tag]);
ws.send(newBuffer);
if (jobAmount < MAX_JOBS) {
log(`Got WS request for job ${job.msg} with id ${id}`, job.num);
acceptJob(id, ws);
} else {
log(`Got WS request for job ${job.msg} with id ${id}, queued in position ${queue.indexOf(id)}`, job.num);
}
} else if (opcode == Tcancel) {
delete queue[queue.indexOf(req) - 1];
jobs.delete(req);
const cancelResponse = Buffer.concat([Buffer.from([Rcancel]), tag]);
ws.send(cancelResponse);
} else if (opcode == Twait) {
const id = msg.readBigUInt64LE(3);
const job = jobs.get(id);
if (!job) {
const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from("Invalid job ID")]);
ws.send(errorResponse);
return;
}
if (job.error) {
job.verifyEvent.emit("error", job.error);
jobs.delete(id);
const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from(job.error)]);
ws.send(errorResponse);
return;
}
job.verifyEvent.emit("end", tag);
job.tag = tag;
jobs.set(id, job);
//const waitResponse = Buffer.concat([Buffer.from([Rwait]), tag]);
//ws.send(waitResponse);
} else {
log("Could not parse WS message");
}
});
ws.on("close", () => {
log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has disconnected`);
});
});
wss.on("error", (err) => {
error("A WS error occurred: ", err);
});
const httpServer = createServer();
httpServer.on("request", async (req, res) => {
if (req.method !== "GET") {
res.statusCode = 405;
return res.end("405 Method Not Allowed");
}
if (PASS && req.headers.authentication !== PASS) {
res.statusCode = 401;
return res.end("401 Unauthorized");
}
const reqUrl = new URL(req.url, `http://${req.headers.host}`);
if (reqUrl.pathname === "/image" && req.method === "GET") {
if (!reqUrl.searchParams.has("id")) {
res.statusCode = 400;
return res.end("400 Bad Request");
}
const id = BigInt(reqUrl.searchParams.get("id"));
if (!jobs.has(id)) {
res.statusCode = 410;
return res.end("410 Gone");
}
log(`Sending image data for job ${id} to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`);
const ext = jobs.get(id).ext;
let contentType;
switch (ext) {
case "gif":
contentType = "image/gif";
break;
case "png":
contentType = "image/png";
break;
case "jpeg":
case "jpg":
contentType = "image/jpeg";
break;
case "webp":
contentType = "image/webp";
break;
}
if (contentType) res.setHeader("Content-Type", contentType);
else res.setHeader("Content-Type", ext);
const data = jobs.get(id).data;
jobs.delete(id);
return res.end(data, (err) => {
if (err) error(err);
});
} else if (reqUrl.pathname === "/count" && req.method === "GET") {
log(`Sending job count to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`);
return res.end(jobAmount.toString(), (err) => {
if (err) error(err);
});
} else {
res.statusCode = 404;
return res.end("404 Not Found");
}
});
httpServer.on("upgrade", (req, sock, head) => {
const reqUrl = new URL(req.url, `http://${req.headers.host}`);
if (PASS && req.headers.authentication !== PASS) {
sock.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
sock.destroy();
return;
}
if (reqUrl.pathname === "/sock") {
wss.handleUpgrade(req, sock, head, (ws) => {
wss.emit("connection", ws, req);
});
} else {
sock.destroy();
}
});
httpServer.on("error", (e) => {
error("An HTTP error occurred: ", e);
});
const port = parseInt(process.env.PORT) || 3762;
httpServer.listen(port, () => {
log("HTTP and WS listening on port 3762");
});
const runJob = (job, ws) => {
return new Promise((resolve, reject) => {
log(`Job ${job.id} starting...`, job.num);
const object = JSON.parse(job.msg);
// If the image has a path, it must also have a type
if (object.path && !object.params.type) {
reject(new TypeError("Unknown image type"));
}
const worker = new Worker(join(dirname(fileURLToPath(import.meta.url)), "../utils/image-runner.js"), {
workerData: object
});
const timeout = setTimeout(() => {
worker.terminate();
reject(new Error("Job timed out"));
}, 900000);
log(`Job ${job.id} started`, job.num);
worker.once("message", (data) => {
clearTimeout(timeout);
log(`Sending result of job ${job.id} back to the bot`, job.num);
const jobObject = jobs.get(job.id);
jobObject.data = data.buffer;
jobObject.ext = data.fileExtension;
let verifyPromise;
if (!jobObject.tag) {
verifyPromise = waitForVerify(jobObject.verifyEvent);
} else {
verifyPromise = Promise.resolve(jobObject.tag);
}
verifyPromise.then(tag => {
jobs.set(job.id, jobObject);
const waitResponse = Buffer.concat([Buffer.from([Rwait]), tag]);
ws.send(waitResponse);
resolve();
});
});
worker.once("error", (e) => {
clearTimeout(timeout);
reject(e);
});
});
};
import { config } from "dotenv";
config();
import { cpus } from "os";
import { Worker } from "worker_threads";
import { join } from "path";
import { createServer } from "http";
import { WebSocketServer } from "ws";
import { fileURLToPath } from "url";
import { dirname } from "path";
import { createRequire } from "module";
import EventEmitter from "events";
const nodeRequire = createRequire(import.meta.url);
const img = nodeRequire(`../build/${process.env.DEBUG && process.env.DEBUG === "true" ? "Debug" : "Release"}/image.node`);
const Rerror = 0x01;
const Tqueue = 0x02;
const Rqueue = 0x03;
const Tcancel = 0x04;
const Rcancel = 0x05;
const Twait = 0x06;
const Rwait = 0x07;
const Rinit = 0x08;
const start = process.hrtime();
const log = (msg, jobNum) => {
console.log(`[${process.hrtime(start)[1] / 1000000}${jobNum ? `:${jobNum}` : ""}]\t ${msg}`);
};
const error = (msg, jobNum) => {
console.error(`[${process.hrtime(start)[1] / 1000000}${jobNum ? `:${jobNum}` : ""}]\t ${msg}`);
};
class JobCache extends Map {
set(key, value) {
super.set(key, value);
setTimeout(() => {
if (super.has(key) && this.get(key) === value && value.data) super.delete(key);
}, 300000); // delete jobs if not requested after 5 minutes
}
}
const jobs = new JobCache();
// Should look like ID : { msg: "request", num: <job number> }
const queue = [];
// Array of IDs
const MAX_JOBS = process.env.JOBS ? parseInt(process.env.JOBS) : cpus().length * 4; // Completely arbitrary, should usually be some multiple of your amount of cores
const PASS = process.env.PASS ? process.env.PASS : undefined;
let jobAmount = 0;
const acceptJob = (id, sock) => {
jobAmount++;
queue.shift();
const job = jobs.get(id);
return runJob({
id: id,
msg: job.msg,
num: job.num
}, sock).then(() => {
log(`Job ${id} has finished`);
}).catch((err) => {
error(`Error on job ${id}: ${err}`, job.num);
const newJob = jobs.get(id);
if (!newJob.tag) {
newJob.error = err.message;
jobs.set(id, newJob);
return;
}
jobs.delete(id);
sock.send(Buffer.concat([Buffer.from([Rerror]), newJob.tag, Buffer.from(err.message)]));
}).finally(() => {
jobAmount--;
if (queue.length > 0) {
acceptJob(queue[0], sock);
}
});
};
const waitForVerify = (event) => {
return new Promise((resolve, reject) => {
event.once("end", (r) => resolve(r));
event.once("error", (e) => reject(e));
});
};
const wss = new WebSocketServer({ clientTracking: true, noServer: true });
wss.on("connection", (ws, request) => {
log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has connected`);
const num = Buffer.alloc(2);
num.writeUInt16LE(MAX_JOBS);
const cur = Buffer.alloc(2);
cur.writeUInt16LE(jobAmount);
const formats = {};
for (const cmd of img.funcs) {
formats[cmd] = ["image/png", "image/gif", "image/jpeg", "image/webp"];
}
const init = Buffer.concat([Buffer.from([Rinit]), Buffer.from([0x00, 0x00]), num, cur, Buffer.from(JSON.stringify(formats))]);
ws.send(init);
ws.on("error", (err) => {
error(err);
});
ws.on("message", (msg) => {
const opcode = msg.readUint8(0);
const tag = msg.slice(1, 3);
const req = msg.toString().slice(3);
if (opcode == Tqueue) {
const id = msg.readBigInt64LE(3);
const obj = msg.slice(11);
const job = { msg: obj, num: jobAmount, verifyEvent: new EventEmitter() };
jobs.set(id, job);
queue.push(id);
const newBuffer = Buffer.concat([Buffer.from([Rqueue]), tag]);
ws.send(newBuffer);
if (jobAmount < MAX_JOBS) {
log(`Got WS request for job ${job.msg} with id ${id}`, job.num);
acceptJob(id, ws);
} else {
log(`Got WS request for job ${job.msg} with id ${id}, queued in position ${queue.indexOf(id)}`, job.num);
}
} else if (opcode == Tcancel) {
delete queue[queue.indexOf(req) - 1];
jobs.delete(req);
const cancelResponse = Buffer.concat([Buffer.from([Rcancel]), tag]);
ws.send(cancelResponse);
} else if (opcode == Twait) {
const id = msg.readBigUInt64LE(3);
const job = jobs.get(id);
if (!job) {
const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from("Invalid job ID")]);
ws.send(errorResponse);
return;
}
if (job.error) {
job.verifyEvent.emit("error", job.error);
jobs.delete(id);
const errorResponse = Buffer.concat([Buffer.from([Rerror]), tag, Buffer.from(job.error)]);
ws.send(errorResponse);
return;
}
job.verifyEvent.emit("end", tag);
job.tag = tag;
jobs.set(id, job);
//const waitResponse = Buffer.concat([Buffer.from([Rwait]), tag]);
//ws.send(waitResponse);
} else {
log("Could not parse WS message");
}
});
ws.on("close", () => {
log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has disconnected`);
});
});
wss.on("error", (err) => {
error("A WS error occurred: ", err);
});
const httpServer = createServer();
httpServer.on("request", async (req, res) => {
if (req.method !== "GET") {
res.statusCode = 405;
return res.end("405 Method Not Allowed");
}
if (PASS && req.headers.authentication !== PASS) {
res.statusCode = 401;
return res.end("401 Unauthorized");
}
const reqUrl = new URL(req.url, `http://${req.headers.host}`);
if (reqUrl.pathname === "/image" && req.method === "GET") {
if (!reqUrl.searchParams.has("id")) {
res.statusCode = 400;
return res.end("400 Bad Request");
}
const id = BigInt(reqUrl.searchParams.get("id"));
if (!jobs.has(id)) {
res.statusCode = 410;
return res.end("410 Gone");
}
log(`Sending image data for job ${id} to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`);
const ext = jobs.get(id).ext;
let contentType;
switch (ext) {
case "gif":
contentType = "image/gif";
break;
case "png":
contentType = "image/png";
break;
case "jpeg":
case "jpg":
contentType = "image/jpeg";
break;
case "webp":
contentType = "image/webp";
break;
}
if (contentType) res.setHeader("Content-Type", contentType);
else res.setHeader("Content-Type", ext);
const data = jobs.get(id).data;
jobs.delete(id);
return res.end(data, (err) => {
if (err) error(err);
});
} else if (reqUrl.pathname === "/count" && req.method === "GET") {
log(`Sending job count to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`);
return res.end(jobAmount.toString(), (err) => {
if (err) error(err);
});
} else {
res.statusCode = 404;
return res.end("404 Not Found");
}
});
httpServer.on("upgrade", (req, sock, head) => {
const reqUrl = new URL(req.url, `http://${req.headers.host}`);
if (PASS && req.headers.authentication !== PASS) {
sock.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
sock.destroy();
return;
}
if (reqUrl.pathname === "/sock") {
wss.handleUpgrade(req, sock, head, (ws) => {
wss.emit("connection", ws, req);
});
} else {
sock.destroy();
}
});
httpServer.on("error", (e) => {
error("An HTTP error occurred: ", e);
});
const port = parseInt(process.env.PORT) || 3762;
httpServer.listen(port, () => {
log("HTTP and WS listening on port 3762");
});
const runJob = (job, ws) => {
return new Promise((resolve, reject) => {
log(`Job ${job.id} starting...`, job.num);
const object = JSON.parse(job.msg);
// If the image has a path, it must also have a type
if (object.path && !object.params.type) {
reject(new TypeError("Unknown image type"));
}
const worker = new Worker(join(dirname(fileURLToPath(import.meta.url)), "../utils/image-runner.js"), {
workerData: object
});
const timeout = setTimeout(() => {
worker.terminate();
reject(new Error("Job timed out"));
}, 900000);
log(`Job ${job.id} started`, job.num);
worker.once("message", (data) => {
clearTimeout(timeout);
log(`Sending result of job ${job.id} back to the bot`, job.num);
const jobObject = jobs.get(job.id);
jobObject.data = data.buffer;
jobObject.ext = data.fileExtension;
let verifyPromise;
if (!jobObject.tag) {
verifyPromise = waitForVerify(jobObject.verifyEvent);
} else {
verifyPromise = Promise.resolve(jobObject.tag);
}
verifyPromise.then(tag => {
jobs.set(job.id, jobObject);
const waitResponse = Buffer.concat([Buffer.from([Rwait]), tag]);
ws.send(waitResponse);
resolve();
});
});
worker.once("error", (e) => {
clearTimeout(timeout);
reject(e);
});
});
};