From c2bd68263bea092d5929e0978d4113e9202ab7ab Mon Sep 17 00:00:00 2001 From: TheEssem Date: Fri, 8 Jan 2021 12:08:10 -0600 Subject: [PATCH] Made image API node chooser sort by job amount instead of CPU usage, added soundreload --- api/index.js | 121 +++++++++++++++---------------------- commands/soundreload.js | 16 +++++ utils/database/postgres.js | 3 +- utils/image.js | 2 +- utils/soundplayer.js | 1 + 5 files changed, 70 insertions(+), 73 deletions(-) create mode 100644 commands/soundreload.js diff --git a/api/index.js b/api/index.js index 0d1a178..5abc7dd 100644 --- a/api/index.js +++ b/api/index.js @@ -9,8 +9,8 @@ const dgram = require("dgram"); // for UDP servers const socket = dgram.createSocket("udp4"); // Our universal UDP socket, this might cause issues and we may have to use a seperate socket for each connection const start = process.hrtime(); -const log = (msg) => { - console.log(`[${process.hrtime(start)[1] / 1000000}]\t ${msg}`); +const log = (msg, jobNum) => { + console.log(`[${process.hrtime(start)[1] / 1000000}${jobNum !== undefined ? `:${jobNum}` : ""}]\t ${msg}`); }; const jobs = {}; @@ -19,32 +19,6 @@ const queue = []; // Array of UUIDs const { v4: uuidv4 } = require("uuid"); -let cpuLoad = 0; - -const getAverage = () => { - const cpus = os.cpus(); - let idle = 0; - let tick = 0; - for (const cpu of cpus) { - for (const type in cpu.times) { - tick += cpu.times[type]; - } - idle += cpu.times.idle; - } - return { - idle: idle / cpus.length, - tick: tick / cpus.length - }; -}; - -let measure = getAverage(); -setInterval(() => { - const newMeasure = getAverage(); - const idleDiff = newMeasure.idle - measure.idle; - const tickDiff = newMeasure.tick - measure.tick; - cpuLoad = 100 - ~~(100 * idleDiff / tickDiff); - measure = newMeasure; -}, 5000); const MAX_JOBS = process.env.JOBS !== "" && process.env.JOBS !== undefined ? parseInt(process.env.JOBS) : os.cpus().length * 4; // Completely arbitrary, should usually be some multiple of your amount of cores let jobAmount = 0; @@ -57,7 +31,8 @@ const acceptJob = async (uuid) => { uuid: uuid, msg: jobs[uuid].msg, addr: jobs[uuid].addr, - port: jobs[uuid].port + port: jobs[uuid].port, + num: jobs[uuid].num }); jobAmount--; if (queue.length > 0) { @@ -87,22 +62,22 @@ server.on("message", (msg, rinfo) => { delete queue[queue.indexOf(req) - 1]; delete jobs[req]; } else if (opcode == 0x1) { - const job = { addr: rinfo.address, port: rinfo.port, msg: req }; + const job = { addr: rinfo.address, port: rinfo.port, msg: req, num: jobAmount }; const uuid = uuidv4(); jobs[uuid] = job; queue.push(uuid); if (jobAmount < MAX_JOBS) { - log(`Got request for job ${job.msg} with id ${uuid}`, job.threadNum); + log(`Got request for job ${job.msg} with id ${uuid}`, job.num); acceptJob(uuid); } else { - log(`Got request for job ${job.msg} with id ${uuid}, queued in position ${queue.indexOf(uuid)}`, job.threadNum); + log(`Got request for job ${job.msg} with id ${uuid}, queued in position ${queue.indexOf(uuid)}`, job.num); } const newBuffer = Buffer.concat([Buffer.from([0x0]), Buffer.from(uuid)]); socket.send(newBuffer, rinfo.port, rinfo.address); } else if (opcode == 0x2) { - socket.send(Buffer.concat([Buffer.from([0x3]), Buffer.from(cpuLoad.toString())]), rinfo.port, rinfo.address); + socket.send(Buffer.concat([Buffer.from([0x3]), Buffer.from((MAX_JOBS - jobAmount).toString())]), rinfo.port, rinfo.address); } else { log("Could not parse message"); } @@ -115,50 +90,54 @@ server.on("listening", () => { server.bind(8080); // ATTENTION: Always going to be bound to 0.0.0.0 !!! -const runJob = async (job) => { - log(`Job ${job.uuid} started`, job.threadNum); +const runJob = (job) => { + return new Promise(async (resolve, reject) => { + log(`Job ${job.uuid} starting...`, job.num); - const object = JSON.parse(job.msg); - let type; - if (object.path) { - type = object.type; - if (!object.type) { - type = await magick.getType(object.path); + const object = JSON.parse(job.msg); + let type; + if (object.path) { + type = object.type; + if (!object.type) { + type = await magick.getType(object.path); + } + if (!type) { + reject(new TypeError("Unknown image type")); + } + object.type = type.split("/")[1]; + if (object.type !== "gif" && object.onlyGIF) reject(new TypeError(`Expected a GIF, got ${object.type}`)); + object.delay = object.delay ? object.delay : 0; } - if (!type) { - throw new TypeError("Unknown image type"); - } - object.type = type.split("/")[1]; - if (object.type !== "gif" && object.onlyGIF) throw new TypeError(`Expected a GIF, got ${object.type}`); - object.delay = object.delay ? object.delay : 0; - } - if (object.type === "gif" && !object.delay) { - const delay = (await execPromise(`ffprobe -v 0 -of csv=p=0 -select_streams v:0 -show_entries stream=r_frame_rate ${object.path}`)).stdout.replace("\n", ""); - object.delay = (100 / delay.split("/")[0]) * delay.split("/")[1]; - } + if (object.type === "gif" && !object.delay) { + const delay = (await execPromise(`ffprobe -v 0 -of csv=p=0 -select_streams v:0 -show_entries stream=r_frame_rate ${object.path}`)).stdout.replace("\n", ""); + object.delay = (100 / delay.split("/")[0]) * delay.split("/")[1]; + } - const data = await magick.run(object, true); + log(`Job ${job.uuid} started`, job.num); + const data = await magick.run(object, true); - log(`Sending result of job ${job.uuid} back to the bot`, job.threadNum); - const server = net.createServer(function(tcpSocket) { - tcpSocket.write(Buffer.concat([Buffer.from(type ? type : "image/png"), Buffer.from("\n"), data]), (err) => { - if (err) console.error(err); - tcpSocket.end(() => { - server.close(); + log(`Sending result of job ${job.uuid} back to the bot`, job.num); + const server = net.createServer(function(tcpSocket) { + tcpSocket.write(Buffer.concat([Buffer.from(type ? type : "image/png"), Buffer.from("\n"), data]), (err) => { + if (err) console.error(err); + tcpSocket.end(() => { + server.close(); + resolve(null); + }); }); }); + server.listen(job.port, job.addr); + // handle address in use errors + server.on("error", (e) => { + if (e.code === "EADDRINUSE") { + log("Address in use, retrying...", job.num); + setTimeout(() => { + server.close(); + server.listen(job.port, job.addr); + }, 500); + } + }); + socket.send(Buffer.concat([Buffer.from([0x1]), Buffer.from(job.uuid), Buffer.from(job.port.toString())]), job.port, job.addr); }); - server.listen(job.port, job.addr); - // handle address in use errors - server.on("error", (e) => { - if (e.code === "EADDRINUSE") { - log("Address in use, retrying...", job.threadNum); - setTimeout(() => { - server.close(); - server.listen(job.port, job.addr); - }, 500); - } - }); - socket.send(Buffer.concat([Buffer.from([0x1]), Buffer.from(job.uuid), Buffer.from(job.port.toString())]), job.port, job.addr); }; \ No newline at end of file diff --git a/commands/soundreload.js b/commands/soundreload.js new file mode 100644 index 0000000..12219bb --- /dev/null +++ b/commands/soundreload.js @@ -0,0 +1,16 @@ +const soundPlayer = require("../utils/soundplayer.js"); + +exports.run = async (message) => { + if (message.author.id !== process.env.OWNER) return `${message.author.mention}, only the bot owner can reload Lavalink!`; + const soundStatus = await soundPlayer.checkStatus(); + if (!soundStatus) { + const length = await soundPlayer.connect(); + return `Successfully connected to ${length} Lavalink node(s).`; + } else { + return `${message.author.mention}, I couldn't connect to any Lavalink nodes!`; + } +}; + +exports.aliases = ["lava", "lavalink", "lavaconnect", "soundconnect"]; +exports.category = 8; +exports.help = "Attempts to reconnect to all available Lavalink nodes"; \ No newline at end of file diff --git a/utils/database/postgres.js b/utils/database/postgres.js index 74ee1c7..12fbfab 100644 --- a/utils/database/postgres.js +++ b/utils/database/postgres.js @@ -60,7 +60,8 @@ exports.getCounts = async () => { exports.addCount = async (command) => { const count = await connection.query("SELECT * FROM counts WHERE command = $1", [command]); - await connection.query("UPDATE counts SET count = $1 WHERE command = $2", [count.rows[0].count + 1, command]); + if (count.rows[0].count) await connection.query("INSERT INTO counts (command, count) VALUES ($1, $2)", [command, 0]); + await connection.query("UPDATE counts SET count = $1 WHERE command = $2", [count.rows[0].count ? count.rows[0].count + 1 : 1, command]); }; exports.addGuild = async (guild) => { diff --git a/utils/image.js b/utils/image.js index 186bd69..e1ad32c 100644 --- a/utils/image.js +++ b/utils/image.js @@ -14,7 +14,7 @@ const formats = ["image/jpeg", "image/png", "image/webp", "image/gif"]; const chooseServer = async (ideal) => { if (ideal.length === 0) throw "No available servers"; const sorted = ideal.sort((a, b) => { - return a.load - b.load; + return b.load - a.load; }); return sorted[0]; }; diff --git a/utils/soundplayer.js b/utils/soundplayer.js index c78b5a2..0b49626 100644 --- a/utils/soundplayer.js +++ b/utils/soundplayer.js @@ -44,6 +44,7 @@ exports.connect = async () => { this.manager.on("error", (error, node) => { logger.error(`An error occurred on Lavalink node ${node}: ${error}`); }); + return length; }; exports.play = async (sound, message, music = false) => {