diff --git a/api/index.js b/api/index.js index 52ebf65..9afaf5f 100644 --- a/api/index.js +++ b/api/index.js @@ -1,12 +1,10 @@ -// code originally provided by tzlil - require("dotenv").config(); const os = require("os"); //const { run } = require("../utils/image-runner.js"); const { Worker } = require("worker_threads"); const path = require("path"); -const net = require("net"); const http = require("http"); +const WebSocket = require("ws"); const start = process.hrtime(); const log = (msg, jobNum) => { @@ -23,7 +21,7 @@ class JobCache extends Map { } const jobs = new JobCache(); -// Should look like UUID : { addr : "someaddr", port: someport, msg: "request" } +// Should look like UUID : { msg: "request", num: } const queue = []; // Array of UUIDs @@ -45,7 +43,7 @@ const acceptJob = (uuid, sock) => { }).catch((err) => { console.error(`Error on job ${uuid}:`, err); jobs.delete(uuid); - sock.write(Buffer.concat([Buffer.from([0x2]), Buffer.from(uuid), Buffer.from(err.message)])); + sock.send(Buffer.concat([Buffer.from([0x2]), Buffer.from(uuid), Buffer.from(err.message)])); }).finally(() => { jobAmount--; if (queue.length > 0) { @@ -54,16 +52,68 @@ const acceptJob = (uuid, sock) => { }); }; -const httpServer = http.createServer((req, res) => { - if (req.method !== "GET") { +const wss = new WebSocket.Server({ clientTracking: true, noServer: true }); + +wss.on("connection", (ws, request) => { + log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has connected`); + + ws.on("error", (err) => { + console.error(err); + }); + + ws.on("message", (msg) => { + const opcode = msg.readUint8(0); + const req = msg.toString().slice(1,msg.length); + console.log(req); + // 0x00 == Cancel job + // 0x01 == Queue job + if (opcode == 0x00) { + delete queue[queue.indexOf(req) - 1]; + jobs.delete(req); + } else if (opcode == 0x01) { + const length = parseInt(req.slice(0, 1)); + const num = req.slice(1, length + 1); + const obj = req.slice(length + 1); + const job = { msg: obj, num: jobAmount }; + const uuid = uuidv4(); + jobs.set(uuid, job); + queue.push(uuid); + + const newBuffer = Buffer.concat([Buffer.from([0x00]), Buffer.from(uuid), Buffer.from(num)]); + ws.send(newBuffer); + + if (jobAmount < MAX_JOBS) { + log(`Got WS request for job ${job.msg} with id ${uuid}`, job.num); + acceptJob(uuid, ws); + } else { + log(`Got WS request for job ${job.msg} with id ${uuid}, queued in position ${queue.indexOf(uuid)}`, job.num); + } + } else { + log("Could not parse WS message"); + } + }); + + ws.on("close", () => { + log(`WS client ${request.socket.remoteAddress}:${request.socket.remotePort} has disconnected`); + }); +}); + +wss.on("error", (err) => { + console.error("A WS error occurred: ", err); +}); + +const httpServer = http.createServer(); + +httpServer.on("request", async (req, res) => { + if (req.method !== "GET" && req.method !== "POST") { res.statusCode = 405; return res.end("405 Method Not Allowed"); } const reqUrl = new URL(req.url, `http://${req.headers.host}`); - if (reqUrl.pathname === "/status") { + if (reqUrl.pathname === "/status" && req.method === "GET") { log(`Sending server status to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`); return res.end(Buffer.from((MAX_JOBS - jobAmount).toString())); - } else if (reqUrl.pathname === "/running") { + } else if (reqUrl.pathname === "/running" && req.method === "GET") { log(`Sending currently running jobs to ${req.socket.remoteAddress}:${req.socket.remotePort} via HTTP`); const keys = jobs.keys(); const newObject = { queued: queue.length, runningJobs: jobAmount, max: MAX_JOBS }; @@ -79,7 +129,7 @@ const httpServer = http.createServer((req, res) => { } } return res.end(JSON.stringify(newObject)); - } else if (reqUrl.pathname === "/image") { + } else if (reqUrl.pathname === "/image" && req.method === "GET") { if (!reqUrl.searchParams.has("id")) { res.statusCode = 400; return res.end("400 Bad Request"); @@ -102,64 +152,24 @@ const httpServer = http.createServer((req, res) => { } }); +httpServer.on("upgrade", (req, sock, head) => { + const reqUrl = new URL(req.url, `http://${req.headers.host}`); + + if (reqUrl.pathname === "/sock") { + wss.handleUpgrade(req, sock, head, (ws) => { + wss.emit("connection", ws, req); + }); + } else { + sock.destroy(); + } +}); + httpServer.on("error", (e) => { console.error("An HTTP error occurred: ", e); }); -httpServer.listen(8081, () => { - log("HTTP listening on port 8081"); -}); - -const server = net.createServer((sock) => { // Create a TCP socket/server to listen to requests - log(`TCP client ${sock.remoteAddress}:${sock.remotePort} has connected`); - - sock.on("error", (e) => { - console.error(e); - }); - - sock.on("data", (msg) => { - const opcode = msg.readUint8(0); - const req = msg.toString().slice(1,msg.length); - console.log(req); - // 0x00 == Cancel job - // 0x01 == Queue job - if (opcode == 0x00) { - delete queue[queue.indexOf(req) - 1]; - jobs.delete(req); - } else if (opcode == 0x01) { - const length = parseInt(req.slice(0, 1)); - const num = req.slice(1, length + 1); - const obj = req.slice(length + 1); - const job = { addr: sock.remoteAddress, port: sock.remotePort, msg: obj, num: jobAmount }; - const uuid = uuidv4(); - jobs.set(uuid, job); - queue.push(uuid); - - const newBuffer = Buffer.concat([Buffer.from([0x00]), Buffer.from(uuid), Buffer.from(num)]); - sock.write(newBuffer); - - if (jobAmount < MAX_JOBS) { - log(`Got TCP request for job ${job.msg} with id ${uuid}`, job.num); - acceptJob(uuid, sock); - } else { - log(`Got TCP request for job ${job.msg} with id ${uuid}, queued in position ${queue.indexOf(uuid)}`, job.num); - } - } else { - log("Could not parse TCP message"); - } - }); - - sock.on("end", () => { - log(`TCP client ${sock.remoteAddress}:${sock.remotePort} has disconnected`); - }); -}); - -server.on("error", (e) => { - console.error("A TCP error occurred: ", e); -}); - -server.listen(8080, () => { - log("TCP listening on port 8080"); +httpServer.listen(8080, () => { + log("HTTP listening on port 8080"); }); const runJob = (job, sock) => { @@ -187,8 +197,7 @@ const runJob = (job, sock) => { jobObject.data = data.buffer; jobObject.ext = data.fileExtension; jobs.set(job.uuid, jobObject); - sock.write(Buffer.concat([Buffer.from([0x1]), Buffer.from(job.uuid)]), (e) => { - if (e) return reject(e); + sock.send(Buffer.concat([Buffer.from([0x1]), Buffer.from(job.uuid)]), () => { return resolve(); }); }); diff --git a/classes/imageCommand.js b/classes/imageCommand.js index da55ed1..57db2c8 100644 --- a/classes/imageCommand.js +++ b/classes/imageCommand.js @@ -108,7 +108,7 @@ class ImageCommand extends Command { name: `${this.constructor.command}.${type}` }; } catch (e) { - if (e.toString().includes("Not connected to image server")) return "I'm still trying to connect to the image servers. Please wait a little bit."; + if (e.toString().includes("Not connected to image server")) return "I can't seem to contact the image servers, they might be down or still trying to start up. Please wait a little bit."; throw e; } finally { if (status && await this.client.getMessage(status.channel.id, status.id).catch(() => undefined)) await status.delete(); diff --git a/events/messageCreate.js b/events/messageCreate.js index 3b09abb..fa39942 100644 --- a/events/messageCreate.js +++ b/events/messageCreate.js @@ -2,7 +2,7 @@ const fs = require("fs"); const database = require("../utils/database.js"); const logger = require("../utils/logger.js"); const collections = require("../utils/collections.js"); -const commands = [...collections.aliases.keys(), ...collections.commands.keys()]; +//const commands = [...collections.aliases.keys(), ...collections.commands.keys()]; // run when someone sends a message module.exports = async (client, cluster, ipc, message) => { @@ -12,16 +12,6 @@ module.exports = async (client, cluster, ipc, message) => { // don't run command if bot can't send messages if (message.channel.guild && !message.channel.permissionsOf(client.user.id).has("sendMessages")) return; - // this is here to prevent reading the database if a message is unrelated - let valid = false; - for (const key of commands) { - if (message.content.toLowerCase().includes(key)) { - valid = true; - break; - } - } - if (!valid) return; - let prefixCandidate; if (message.channel.guild) { if (collections.prefixCache.has(message.channel.guild.id)) { @@ -96,8 +86,8 @@ module.exports = async (client, cluster, ipc, message) => { try { await database.addCount(collections.aliases.has(command) ? collections.aliases.get(command) : command); const startTime = new Date(); - const commandClass = new cmd(client, cluster, ipc, message, args, message.content.substring(prefix.length).trim().replace(command, "").trim()); - const result = await commandClass.run(); // we also provide the message content as a parameter for cases where we need more accuracy + const commandClass = new cmd(client, cluster, ipc, message, args, message.content.substring(prefix.length).trim().replace(command, "").trim()); // we also provide the message content as a parameter for cases where we need more accuracy + const result = await commandClass.run(); const endTime = new Date(); if ((endTime - startTime) >= 180000) reference.allowedMentions.repliedUser = true; if (typeof result === "string") { diff --git a/messages.json b/messages.json index f990861..0a39734 100644 --- a/messages.json +++ b/messages.json @@ -139,6 +139,7 @@ "WhatsApp", "Half Life 3", "Tower Defense Simulator", - "roblox on linux 2021" + "roblox on linux 2021", + "trans rights" ] } \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 8983480..29f5af7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,7 +22,8 @@ "node-emoji": "^1.10.0", "node-fetch": "^2.6.1", "qrcode": "^1.4.4", - "sharp": "^0.28.2" + "sharp": "^0.28.2", + "ws": "*" }, "devDependencies": { "@babel/eslint-parser": "^7.13.8", @@ -39,6 +40,7 @@ "mongoose": "^5.11.8", "pg": "^8.5.1", "uuid": "^8.3.1", + "ws": "^7.5.0", "zlib-sync": "^0.1.6" } }, @@ -3519,9 +3521,9 @@ "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" }, "node_modules/ws": { - "version": "7.4.6", - "resolved": "https://registry.npmjs.org/ws/-/ws-7.4.6.tgz", - "integrity": "sha512-YmhHDO4MzaDLB+M9ym/mDA5z0naX8j7SIlT8f8z+I0VtzsRbekxEutHSme7NPS2qE8StCYQNUnfWdXta/Yu85A==", + "version": "7.5.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.0.tgz", + "integrity": "sha512-6ezXvzOZupqKj4jUqbQ9tXuJNo+BR2gU8fFRk3XCP3e0G6WT414u5ELe6Y0vtp7kmSJ3F7YWObSNr1ESsgi4vw==", "engines": { "node": ">=8.3.0" }, @@ -6334,9 +6336,9 @@ "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" }, "ws": { - "version": "7.4.6", - "resolved": "https://registry.npmjs.org/ws/-/ws-7.4.6.tgz", - "integrity": "sha512-YmhHDO4MzaDLB+M9ym/mDA5z0naX8j7SIlT8f8z+I0VtzsRbekxEutHSme7NPS2qE8StCYQNUnfWdXta/Yu85A==", + "version": "7.5.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.0.tgz", + "integrity": "sha512-6ezXvzOZupqKj4jUqbQ9tXuJNo+BR2gU8fFRk3XCP3e0G6WT414u5ELe6Y0vtp7kmSJ3F7YWObSNr1ESsgi4vw==", "requires": {} }, "xtend": { diff --git a/package.json b/package.json index 53be613..caf38ab 100644 --- a/package.json +++ b/package.json @@ -51,6 +51,7 @@ "mongoose": "^5.11.8", "pg": "^8.5.1", "uuid": "^8.3.1", + "ws": "^7.5.0", "zlib-sync": "^0.1.6" } } diff --git a/utils/image.js b/utils/image.js index d368a70..5443760 100644 --- a/utils/image.js +++ b/utils/image.js @@ -2,7 +2,7 @@ const magick = require("../build/Release/image.node"); const { Worker } = require("worker_threads"); const fetch = require("node-fetch"); const fs = require("fs"); -const net = require("net"); +const WebSocket = require("ws"); const fileType = require("file-type"); const path = require("path"); const { EventEmitter } = require("events"); @@ -12,7 +12,7 @@ const formats = ["image/jpeg", "image/png", "image/webp", "image/gif", "video/mp exports.jobs = {}; -exports.connections = []; +exports.connections = new Map(); exports.servers = JSON.parse(fs.readFileSync("./servers.json", { encoding: "utf8" })).image; @@ -32,17 +32,18 @@ exports.repopulate = async () => { exports.getStatus = () => { return new Promise((resolve, reject) => { - let serversLeft = this.connections.length; + let serversLeft = this.connections.size; const statuses = []; const timeout = setTimeout(() => { resolve(statuses); }, 5000); - for (const connection of this.connections) { - if (!connection.remoteAddress) { + for (const address of this.connections.keys()) { + const connection = this.connections.get(address); + if (connection.readyState !== 1) { serversLeft--; continue; } - fetch(`http://${connection.remoteAddress}:8081/running`).then(statusRequest => statusRequest.json()).then((status) => { + fetch(`http://${address}:8080/running`).then(statusRequest => statusRequest.json()).then((status) => { serversLeft--; statuses.push(status); if (!serversLeft) { @@ -67,16 +68,15 @@ exports.getStatus = () => { exports.connect = (server) => { return new Promise((resolve, reject) => { - const connection = net.createConnection(8080, server); + const connection = new WebSocket(`ws://${server}:8080/sock`); const timeout = setTimeout(() => { - const connectionIndex = this.connections.indexOf(connection); - if (connectionIndex < 0) delete this.connections[connectionIndex]; + this.connections.delete(server); reject(`Failed to connect to ${server}`); }, 5000); - connection.once("connect", () => { + connection.once("open", () => { clearTimeout(timeout); }); - connection.on("data", async (msg) => { + connection.on("message", async (msg) => { const opcode = msg.readUint8(0); const req = msg.slice(37, msg.length); const uuid = msg.slice(1, 37).toString(); @@ -87,7 +87,7 @@ exports.connect = (server) => { } else if (opcode === 0x01) { // Job completed successfully // the image API sends all job responses over the same socket; make sure this is ours if (this.jobs[uuid]) { - const imageReq = await fetch(`http://${connection.remoteAddress}:8081/image?id=${uuid}`); + const imageReq = await fetch(`http://${server}:8080/image?id=${uuid}`); const image = await imageReq.buffer(); // The response data is given as the file extension/ImageMagick type of the image (e.g. "png"), followed // by a newline, followed by the image data. @@ -103,49 +103,61 @@ exports.connect = (server) => { connection.on("error", (e) => { logger.error(e.toString()); }); - connection.once("end", () => { + connection.once("close", () => { for (const uuid of Object.keys(this.jobs)) { - if (this.jobs[uuid].addr === connection.remoteAddress) this.jobs[uuid].event.emit("error", "Job ended prematurely due to a closed connection; please run your image job again"); + if (this.jobs[uuid].addr === server) this.jobs[uuid].event.emit("error", "Job ended prematurely due to a closed connection; please run your image job again"); } + logger.log(`Lost connection to ${server}, attempting to reconnect...`); + this.connections.delete(server); + //this.connect(server); }); - this.connections.push(connection); + this.connections.set(server, connection); resolve(); }); }; exports.disconnect = async () => { - for (const connection of this.connections) { - connection.destroy(); + for (const connection of this.connections.values()) { + connection.close(); } for (const uuid of Object.keys(this.jobs)) { this.jobs[uuid].event.emit("error", "Job ended prematurely (not really an error; just run your image job again)"); delete this.jobs[uuid]; } - this.connections = []; + this.connections.clear(); return; }; const getIdeal = () => { return new Promise((resolve, reject) => { - let serversLeft = this.connections.length; + let serversLeft = this.connections.size; + if (serversLeft === 0) { + for (const server of this.servers) { + this.connect(server).catch(e => { + logger.error(e); + }); + } + serversLeft = this.connections.size; + } const idealServers = []; const timeout = setTimeout(async () => { try { const server = await chooseServer(idealServers); - resolve(this.connections.find(val => val.remoteAddress === server.addr)); + resolve({ addr: server.addr, sock: this.connections.get(server.addr) }); } catch (e) { reject(e); } }, 5000); - for (const connection of this.connections) { - if (!connection.remoteAddress) { + for (const address of this.connections.keys()) { + const connection = this.connections.get(address); + if (connection.readyState !== 1) { serversLeft--; continue; } - fetch(`http://${connection.remoteAddress}:8081/status`).then(statusRequest => statusRequest.text()).then((status) => { + fetch(`http://${address}:8080/status`).then(statusRequest => statusRequest.text()).then((status) => { serversLeft--; idealServers.push({ - addr: connection.remoteAddress, + addr: address, load: parseInt(status) }); return; @@ -153,7 +165,7 @@ const getIdeal = () => { if (!serversLeft) { clearTimeout(timeout); const server = await chooseServer(idealServers); - resolve(this.connections.find(val => val.remoteAddress === server.addr)); + resolve({ addr: server.addr, sock: this.connections.get(server.addr) }); } }).catch(e => { if (e.code === "ECONNREFUSED") { @@ -166,7 +178,7 @@ const getIdeal = () => { if (!serversLeft) { clearTimeout(timeout); chooseServer(idealServers).then(server => { - resolve(this.connections.find(val => val.remoteAddress === server.addr)); + resolve({ addr: server.addr, sock: this.connections.get(server.addr) }); }).catch(e => reject(e)); } }); @@ -175,30 +187,10 @@ const getIdeal = () => { const start = (object, num) => { return getIdeal().then(async (currentServer) => { const data = Buffer.concat([Buffer.from([0x01 /* queue job */]), Buffer.from(num.length.toString()), Buffer.from(num), Buffer.from(JSON.stringify(object))]); - return new Promise((resolve, reject) => { - if (currentServer.destroyed) { - logger.log(`Lost connection to ${currentServer.remoteAddress}, attempting to reconnect...`); - currentServer.connect(8080, currentServer.remoteAddress, () => { - const res = start(object, num); - reject(res); // this is done to differentiate the result from a step - }); - } else { - currentServer.write(data, (err) => { - if (err) { - if (err.code === "EPIPE") { - logger.log(`Lost connection to ${currentServer.remoteAddress}, attempting to reconnect...`); - currentServer.connect(8080, currentServer.remoteAddress, () => { - const res = start(object, num); - reject(res); // this is done to differentiate the result from a step - }); - } else { - reject(err); - } - } else { - resolve(currentServer.remoteAddress); - } - }); - } + return new Promise((resolve) => { + currentServer.sock.send(data, () => { + resolve(currentServer.addr); + }); }); }).catch((result) => { throw result;