Replace eris-fleet with a pm2-based cluster system, overhaul image handling, removed azure image api

This commit is contained in:
Essem 2022-09-21 00:05:03 -05:00
parent 5a3364736d
commit db0decf71a
No known key found for this signature in database
GPG key ID: 7D497397CC3A2A8C
45 changed files with 1777 additions and 857 deletions

View file

@ -37,11 +37,7 @@ This page was last generated on \`${new Date().toString()}\`.
\`[]\` means an argument is required, \`{}\` means an argument is optional.
Default prefix is \`&\`.
**Want to help support esmBot's development? Consider donating on Patreon!** https://patreon.com/TheEssem
> Tip: You can get much more info about a command by using \`help [command]\` in the bot itself.
`;
template += "\n## Table of Contents\n";

View file

@ -1,14 +1,22 @@
import { request } from "undici";
import fs from "fs";
import path from "path";
import { fileURLToPath } from "url";
import { Worker } from "worker_threads";
import { createRequire } from "module";
import { fileTypeFromBuffer, fileTypeFromFile } from "file-type";
import * as logger from "./logger.js";
import ImageConnection from "./imageConnection.js";
// only requiring this to work around an issue regarding worker threads
const nodeRequire = createRequire(import.meta.url);
if (!process.env.API_TYPE || process.env.API_TYPE === "none") {
nodeRequire(`../build/${process.env.DEBUG && process.env.DEBUG === "true" ? "Debug" : "Release"}/image.node`);
}
const formats = ["image/jpeg", "image/png", "image/webp", "image/gif", "video/mp4", "video/webm", "video/quicktime"];
export const jobs = {};
export const connections = new Map();
export const servers = JSON.parse(fs.readFileSync(new URL("../config/servers.json", import.meta.url), { encoding: "utf8" })).image;
export let servers = process.env.API_TYPE === "ws" ? JSON.parse(fs.readFileSync(new URL("../config/servers.json", import.meta.url), { encoding: "utf8" })).image : null;
export async function getType(image, extraReturnTypes) {
if (!image.startsWith("http")) {
@ -65,3 +73,98 @@ export async function getType(image, extraReturnTypes) {
}
return type;
}
function connect(server, auth) {
const connection = new ImageConnection(server, auth);
connections.set(server, connection);
}
function disconnect() {
for (const connection of connections.values()) {
connection.close();
}
connections.clear();
}
async function repopulate() {
const data = await fs.promises.readFile(new URL("../config/servers.json", import.meta.url), { encoding: "utf8" });
servers = JSON.parse(data).image;
}
export async function reloadImageConnections() {
disconnect();
await repopulate();
let amount = 0;
for (const server of servers) {
try {
connect(server.server, server.auth);
amount += 1;
} catch (e) {
logger.error(e);
}
}
return amount;
}
function chooseServer(ideal) {
if (ideal.length === 0) throw "No available servers";
const sorted = ideal.sort((a, b) => {
return a.load - b.load;
});
return sorted[0];
}
async function getIdeal(object) {
const idealServers = [];
for (const [address, connection] of connections) {
if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) {
continue;
}
if (object.params.type && !connection.formats[object.cmd]?.includes(object.params.type)) continue;
idealServers.push({
addr: address,
load: await connection.getCount()
});
}
const server = chooseServer(idealServers);
return connections.get(server.addr);
}
function waitForWorker(worker) {
return new Promise((resolve, reject) => {
worker.once("message", (data) => {
resolve({
buffer: Buffer.from([...data.buffer]),
type: data.fileExtension
});
});
worker.once("error", reject);
});
}
export async function runImageJob(params) {
if (process.env.API_TYPE === "ws") {
for (let i = 0; i < 3; i++) {
const currentServer = await getIdeal(params);
try {
await currentServer.queue(BigInt(params.id), params);
await currentServer.wait(BigInt(params.id));
const output = await currentServer.getOutput(params.id);
return output;
} catch (e) {
if (i < 2 && e === "Request ended prematurely due to a closed connection") {
continue;
} else {
if (e === "No available servers" && i >= 2) throw "Request ended prematurely due to a closed connection";
throw e;
}
}
}
} else {
// Called from command (not using image API)
const worker = new Worker(path.join(path.dirname(fileURLToPath(import.meta.url)), "./image-runner.js"), {
workerData: params
});
return await waitForWorker(worker);
}
}

View file

@ -22,8 +22,6 @@ class ImageConnection {
this.auth = auth;
this.tag = 0;
this.disconnected = false;
this.njobs = 0;
this.max = 0;
this.formats = {};
this.wsproto = null;
if (tls) {
@ -43,17 +41,15 @@ class ImageConnection {
} else {
httpproto = "http";
}
this.httpurl = `${httpproto}://${host}/image`;
this.httpurl = `${httpproto}://${host}`;
this.conn.on("message", (msg) => this.onMessage(msg));
this.conn.once("error", (err) => this.onError(err));
this.conn.once("close", () => this.onClose());
}
onMessage(msg) {
async onMessage(msg) {
const op = msg.readUint8(0);
if (op === Rinit) {
this.max = msg.readUint16LE(3);
this.njobs = msg.readUint16LE(5);
this.formats = JSON.parse(msg.toString("utf8", 7));
return;
}
@ -64,10 +60,7 @@ class ImageConnection {
return;
}
this.requests.delete(tag);
if (op === Rqueue) this.njobs++;
if (op === Rcancel || op === Rwait) this.njobs--;
if (op === Rerror) {
this.njobs--;
promise.reject(new Error(msg.slice(3, msg.length).toString()));
return;
}
@ -82,9 +75,7 @@ class ImageConnection {
for (const [tag, obj] of this.requests.entries()) {
obj.reject("Request ended prematurely due to a closed connection");
this.requests.delete(tag);
if (obj.op === Twait || obj.op === Tcancel) this.njobs--;
}
//this.requests.clear();
if (!this.disconnected) {
logger.warn(`Lost connection to ${this.host}, attempting to reconnect in 5 seconds...`);
await setTimeout(5000);
@ -107,25 +98,25 @@ class ImageConnection {
queue(jobid, jobobj) {
const str = JSON.stringify(jobobj);
const buf = Buffer.alloc(4);
buf.writeUint32LE(jobid);
const buf = Buffer.alloc(8);
buf.writeBigUint64LE(jobid);
return this.do(Tqueue, jobid, Buffer.concat([buf, Buffer.from(str)]));
}
wait(jobid) {
const buf = Buffer.alloc(4);
buf.writeUint32LE(jobid);
const buf = Buffer.alloc(8);
buf.writeBigUint64LE(jobid);
return this.do(Twait, jobid, buf);
}
cancel(jobid) {
const buf = Buffer.alloc(4);
buf.writeUint32LE(jobid);
const buf = Buffer.alloc(8);
buf.writeBigUint64LE(jobid);
return this.do(Tcancel, jobid, buf);
}
async getOutput(jobid) {
const req = await request(`${this.httpurl}?id=${jobid}`, {
const req = await request(`${this.httpurl}/image?id=${jobid}`, {
headers: {
authentication: this.auth || undefined
}
@ -149,7 +140,18 @@ class ImageConnection {
type = contentType;
break;
}
return { buffer: Buffer.from(await req.body.arrayBuffer()), type };
return { arrayBuffer: await req.body.arrayBuffer(), type };
}
async getCount() {
const req = await request(`${this.httpurl}/count`, {
headers: {
authentication: this.auth || undefined
}
});
if (req.statusCode !== 200) return;
const res = parseInt(await req.body.text());
return res;
}
async do(op, id, data) {

View file

@ -1,4 +1,41 @@
export function log(type, content) { return content ? console[type](content) : console.info(type); }
import winston from "winston";
import "winston-daily-rotate-file";
export const logger = winston.createLogger({
levels: {
error: 0,
warn: 1,
info: 2,
main: 3,
debug: 4
},
transports: [
new winston.transports.Console({ format: winston.format.colorize({ all: true }), stderrLevels: ["error", "warn"] }),
new winston.transports.DailyRotateFile({ filename: "logs/error-%DATE%.log", level: "error", zippedArchive: true, maxSize: 4194304, maxFiles: 8 }),
new winston.transports.DailyRotateFile({ filename: "logs/main-%DATE%.log", zippedArchive: true, maxSize: 4194304, maxFiles: 8 })
],
level: process.env.DEBUG_LOG ? "debug" : "main",
format: winston.format.combine(
winston.format.timestamp({ format: "YYYY-MM-DD HH:mm:ss" }),
winston.format.printf((info) => {
const {
timestamp, level, message, ...args
} = info;
return `[${timestamp}]: [${level.toUpperCase()}] - ${message} ${Object.keys(args).length ? JSON.stringify(args, null, 2) : ""}`;
}),
)
});
winston.addColors({
info: "green",
main: "gray",
debug: "magenta",
warn: "yellow",
error: "red"
});
export function log(type, content) { return content ? logger.log(type === "log" ? "main" : type, content) : logger.info(type); }
export function error(...args) { return log("error", ...args); }

View file

@ -1,7 +1,14 @@
import util from "util";
import fs from "fs";
import pm2 from "pm2";
import { config } from "dotenv";
// playing messages
const { messages } = JSON.parse(fs.readFileSync(new URL("../config/messages.json", import.meta.url)));
const { types } = JSON.parse(fs.readFileSync(new URL("../config/commands.json", import.meta.url)));
let broadcast = false;
// random(array) to select a random entry in array
export function random(array) {
if (!array || array.length < 1) return null;
@ -41,4 +48,62 @@ export function clean(text) {
// textEncode(string) to encode characters for image processing
export function textEncode(string) {
return string.replaceAll("&", "&amp;").replaceAll(">", "&gt;").replaceAll("<", "&lt;").replaceAll("\"", "&quot;").replaceAll("'", "&apos;").replaceAll("\\n", "\n").replaceAll("\\:", ":");
}
// set activity (a.k.a. the gamer code)
export function activityChanger(bot) {
if (!broadcast) {
bot.editStatus("dnd", {
name: random(messages) + (types.classic ? ` | @${bot.user.username} help` : ""),
});
}
setTimeout(() => activityChanger(bot), 900000);
}
export function checkBroadcast(bot) {
/*if () {
startBroadcast(bot, message);
}*/
}
export function startBroadcast(bot, message) {
bot.editStatus("dnd", {
name: message + (types.classic ? ` | @${bot.user.username} help` : ""),
});
broadcast = true;
}
export function endBroadcast(bot) {
bot.editStatus("dnd", {
name: random(messages) + (types.classic ? ` | @${bot.user.username} help` : ""),
});
broadcast = false;
}
export function getServers() {
return new Promise((resolve, reject) => {
if (process.env.PM2_USAGE) {
pm2.launchBus((err, pm2Bus) => {
const listener = (packet) => {
if (packet.data?.type === "countResponse") {
resolve(packet.data.serverCount);
pm2Bus.off("process:msg");
}
};
pm2Bus.on("process:msg", listener);
});
pm2.sendDataToProcessId(0, {
id: 0,
type: "process:msg",
data: {
type: "getCount"
},
topic: true
}, (err) => {
if (err) reject(err);
});
} else {
resolve(0);
}
});
}

125
utils/pm2/ext.js Normal file
View file

@ -0,0 +1,125 @@
import pm2 from "pm2";
import { Api } from "@top-gg/sdk";
import winston from "winston";
// load config from .env file
import { resolve, dirname } from "path";
import { fileURLToPath } from "url";
import { config } from "dotenv";
config({ path: resolve(dirname(fileURLToPath(import.meta.url)), "../../.env") });
const dbl = process.env.NODE_ENV === "production" && process.env.DBL ? new Api(process.env.DBL) : null;
const logger = winston.createLogger({
levels: {
error: 0,
warn: 1,
info: 2,
main: 3,
debug: 4
},
transports: [
new winston.transports.Console({ format: winston.format.colorize({ all: true }), stderrLevels: ["error", "warn"] })
],
level: process.env.DEBUG_LOG ? "debug" : "main",
format: winston.format.combine(
winston.format.timestamp({ format: "YYYY-MM-DD HH:mm:ss" }),
winston.format.printf((info) => {
const {
timestamp, level, message, ...args
} = info;
return `[${timestamp}]: [${level.toUpperCase()}] - ${message} ${Object.keys(args).length ? JSON.stringify(args, null, 2) : ""}`;
}),
)
});
winston.addColors({
info: "green",
main: "gray",
debug: "magenta",
warn: "yellow",
error: "red"
});
let serverCount = 0;
let shardCount = 0;
let clusterCount = 0;
let responseCount = 0;
let timeout;
process.on("message", (packet) => {
if (packet.data?.type === "getCount") {
process.send({
type: "process:msg",
data: {
type: "countResponse",
serverCount
}
});
}
});
function updateStats() {
return new Promise((resolve, reject) => {
pm2.list((err, list) => {
if (err) reject(err);
const clusters = list.filter((v) => v.name === "esmBot");
clusterCount = clusters.length;
const listener = (packet) => {
if (packet.data?.type === "serverCounts") {
clearTimeout(timeout);
serverCount += packet.data.guilds;
shardCount += packet.data.shards;
responseCount += 1;
if (responseCount >= clusterCount) {
resolve();
process.removeListener("message", listener);
} else {
timeout = setTimeout(() => {
reject();
process.removeListener("message", listener);
}, 5000);
}
}
};
timeout = setTimeout(() => {
reject();
process.removeListener("message", listener);
}, 5000);
process.on("message", listener);
process.send({
type: "process:msg",
data: {
type: "serverCounts"
}
});
});
});
}
async function dblPost() {
logger.main("Posting stats to Top.gg...");
serverCount = 0;
shardCount = 0;
clusterCount = 0;
responseCount = 0;
try {
//await updateStats();
await dbl.postStats({
serverCount,
shardCount
});
logger.main("Stats posted.");
} catch (e) {
logger.error(e);
}
}
setInterval(updateStats, 300000);
if (dbl) setInterval(dblPost, 1800000);
setTimeout(updateStats, 10000);
logger.info("Started esmBot management process.");

View file

@ -1,268 +0,0 @@
import { BaseServiceWorker } from "eris-fleet";
import * as logger from "../logger.js";
import fs from "fs";
import path from "path";
import { fileURLToPath } from "url";
import { Worker } from "worker_threads";
import { createRequire } from "module";
import { createServer } from "http";
import { request } from "undici";
import EventEmitter from "events";
// only requiring this to work around an issue regarding worker threads
const nodeRequire = createRequire(import.meta.url);
if (!process.env.API_TYPE || process.env.API_TYPE === "none") {
nodeRequire(`../../build/${process.env.DEBUG && process.env.DEBUG === "true" ? "Debug" : "Release"}/image.node`);
}
import ImageConnection from "../imageConnection.js";
class ImageWorker extends BaseServiceWorker {
constructor(setup) {
super(setup);
console.info = (str) => this.ipc.sendToAdmiral("info", str);
if (process.env.API_TYPE === "ws") {
this.connections = new Map();
this.servers = JSON.parse(fs.readFileSync(new URL("../../config/servers.json", import.meta.url), { encoding: "utf8" })).image;
this.nextID = 0;
} else if (process.env.API_TYPE === "azure") {
this.jobs = new Map();
this.webhook = createServer();
this.port = parseInt(process.env.WEBHOOK_PORT) || 3763;
}
this.begin().then(() => this.serviceReady());
}
async begin() {
// connect to image api if enabled
if (process.env.API_TYPE === "ws") {
for (const server of this.servers) {
try {
await this.connect(server.server, server.auth);
} catch (e) {
logger.error(e);
}
}
} else if (process.env.API_TYPE === "azure") {
this.webhook.on("request", async (req, res) => {
if (req.method !== "POST") {
res.statusCode = 405;
return res.end("405 Method Not Allowed");
}
if (process.env.AZURE_PASS && req.headers.authorization !== process.env.AZURE_PASS) {
res.statusCode = 401;
return res.end("401 Unauthorized");
}
const reqUrl = new URL(req.url, `http://${req.headers.host}`);
if (reqUrl.pathname === "/callback") {
try {
const chunks = [];
req.on("data", (data) => {
chunks.push(data);
});
req.once("end", () => {
if (this.jobs.has(req.headers["x-azure-id"])) {
try {
const error = JSON.parse(Buffer.concat(chunks).toString());
if (error.error) this.jobs.get(req.headers["x-azure-id"]).emit("error", new Error(error.message));
} catch {
// no-op
}
const contentType = req.headers["content-type"];
let type;
switch (contentType) {
case "image/gif":
type = "gif";
break;
case "image/png":
type = "png";
break;
case "image/jpeg":
type = "jpg";
break;
case "image/webp":
type = "webp";
break;
default:
type = contentType;
break;
}
this.jobs.get(req.headers["x-azure-id"]).emit("image", { buffer: Buffer.concat(chunks), type });
return res.end("OK");
} else {
res.statusCode = 409;
return res.end("409 Conflict");
}
});
} catch (e) {
logger.error("An error occurred while processing a webhook request: ", e);
res.statusCode = 500;
return res.end("500 Internal Server Error");
}
} else {
res.statusCode = 404;
return res.end("404 Not Found");
}
});
this.webhook.on("error", (e) => {
logger.error("An error occurred on the Azure webhook: ", e);
});
this.webhook.listen(this.port, () => {
logger.log(`Azure HTTP webhook listening on port ${this.port}`);
});
}
}
async repopulate() {
const data = await fs.promises.readFile(new URL("../../config/servers.json", import.meta.url), { encoding: "utf8" });
this.servers = JSON.parse(data).image;
return;
}
async getRunning() {
const statuses = [];
if (process.env.API_TYPE === "ws") {
for (const [address, connection] of this.connections) {
if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) {
continue;
}
statuses.push({
address,
runningJobs: connection.njobs,
max: connection.max
});
}
}
return statuses;
}
async chooseServer(ideal) {
if (ideal.length === 0) throw "No available servers";
const sorted = ideal.sort((a, b) => {
return a.load - b.load;
}).filter((e, i, array) => {
return !(e.load < array[0].load);
});
return sorted[0];
}
async getIdeal(object) {
const idealServers = [];
for (const [address, connection] of this.connections) {
if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) {
continue;
}
if (object.params.type && !connection.formats[object.cmd]?.includes(object.params.type)) continue;
idealServers.push({
addr: address,
load: connection.njobs / connection.max
});
}
const server = await this.chooseServer(idealServers);
return this.connections.get(server.addr);
}
async connect(server, auth) {
const connection = new ImageConnection(server, auth);
this.connections.set(server, connection);
}
async disconnect() {
for (const connection of this.connections.values()) {
connection.close();
}
this.connections.clear();
return;
}
waitForWorker(worker) {
return new Promise((resolve, reject) => {
worker.once("message", (data) => {
resolve({
buffer: Buffer.from([...data.buffer]),
type: data.fileExtension
});
});
worker.once("error", reject);
});
}
waitForAzure(event) {
return new Promise((resolve, reject) => {
event.once("image", (data) => {
resolve(data);
});
event.once("error", reject);
});
}
async run(object) {
if (process.env.API_TYPE === "ws") {
let num = this.nextID++;
if (num > 4294967295) num = this.nextID = 0;
for (let i = 0; i < 3; i++) {
const currentServer = await this.getIdeal(object);
try {
await currentServer.queue(num, object);
await currentServer.wait(num);
const output = await currentServer.getOutput(num);
return output;
} catch (e) {
if (i < 2 && e === "Request ended prematurely due to a closed connection") {
continue;
} else {
if (e === "No available servers" && i >= 2) throw "Request ended prematurely due to a closed connection";
throw e;
}
}
}
} else if (process.env.API_TYPE === "azure") {
object.callback = `${process.env.AZURE_CALLBACK_URL}:${this.port}/callback`;
const response = await request(`${process.env.AZURE_URL}/api/orchestrators/ImageOrchestrator`, { method: "POST", body: JSON.stringify(object) }).then(r => r.body.json());
const event = new EventEmitter();
this.jobs.set(response.id, event);
return await this.waitForAzure(event);
} else {
// Called from command (not using image API)
const worker = new Worker(path.join(path.dirname(fileURLToPath(import.meta.url)), "../image-runner.js"), {
workerData: object
});
return await this.waitForWorker(worker);
}
}
async handleCommand(data) {
try {
if (data.type === "run") {
const result = await this.run(data.obj);
return result;
} else if (data.type === "reload") {
await this.disconnect();
await this.repopulate();
let amount = 0;
for (const server of this.servers) {
try {
await this.connect(server.server, server.auth);
amount += 1;
} catch (e) {
logger.error(e);
}
}
return amount;
} else if (data.type === "stats") {
return await this.getRunning();
}
} catch (err) {
return { err: typeof err === "string" ? err : err.message };
}
}
shutdown(done) {
done();
}
}
export default ImageWorker;

View file

@ -1,7 +1,9 @@
import * as logger from "../utils/logger.js";
import { readdir, lstat, rm, writeFile } from "fs/promises";
import { readdir, lstat, rm, writeFile, stat } from "fs/promises";
export async function upload(client, ipc, result, context, interaction = false) {
let dirSizeCache;
export async function upload(client, result, context, interaction = false) {
const filename = `${Math.random().toString(36).substring(2, 15)}.${result.name.split(".")[1]}`;
await writeFile(`${process.env.TEMPDIR}/${filename}`, result.file);
const imageURL = `${process.env.TMP_DOMAIN || "https://tmp.projectlounge.pw"}/${filename}`;
@ -34,13 +36,13 @@ export async function upload(client, ipc, result, context, interaction = false)
}));
}
if (process.env.THRESHOLD) {
const size = await ipc.centralStore.get("dirSizeCache") + result.file.length;
await ipc.centralStore.set("dirSizeCache", size);
await removeOldImages(ipc, size);
const size = dirSizeCache + result.file.length;
dirSizeCache = size;
await removeOldImages(size);
}
}
export async function removeOldImages(ipc, size) {
async function removeOldImages(size) {
if (size > process.env.THRESHOLD) {
const files = (await readdir(process.env.TEMPDIR)).map((file) => {
return lstat(`${process.env.TEMPDIR}/${file}`).then((stats) => {
@ -67,6 +69,30 @@ export async function removeOldImages(ipc, size) {
const newSize = oldestFiles.reduce((a, b) => {
return a + b.size;
}, 0);
await ipc.centralStore.set("dirSizeCache", newSize);
dirSizeCache = newSize;
}
}
export async function parseThreshold() {
const matched = process.env.THRESHOLD.match(/(\d+)([KMGT])/);
const sizes = {
K: 1024,
M: 1048576,
G: 1073741824,
T: 1099511627776
};
if (matched && matched[1] && matched[2]) {
process.env.THRESHOLD = matched[1] * sizes[matched[2]];
} else {
logger.error("Invalid THRESHOLD config.");
process.env.THRESHOLD = undefined;
}
const dirstat = (await readdir(process.env.TEMPDIR)).map((file) => {
return stat(`${process.env.TEMPDIR}/${file}`).then((stats) => stats.size);
});
const size = await Promise.all(dirstat);
const reduced = size.reduce((a, b) => {
return a + b;
}, 0);
dirSizeCache = reduced;
}