From 7a254a5139f5af8272efb9403f18bab70e41347b Mon Sep 17 00:00:00 2001 From: TheEssem Date: Fri, 25 Dec 2020 20:27:45 -0600 Subject: [PATCH] Added load-based api node selection --- Dockerfile.api | 2 +- api/index.js | 31 +++++++++++++++++++++++++- package.json | 2 +- utils/image.js | 59 ++++++++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 87 insertions(+), 7 deletions(-) diff --git a/Dockerfile.api b/Dockerfile.api index f2245ec..cacce80 100644 --- a/Dockerfile.api +++ b/Dockerfile.api @@ -45,6 +45,6 @@ COPY . . RUN npm run build USER esmBot -EXPOSE 3000 +EXPOSE 8080 ENTRYPOINT ["node", "api/index.js"] \ No newline at end of file diff --git a/api/index.js b/api/index.js index 4d9acb7..d5bd25e 100644 --- a/api/index.js +++ b/api/index.js @@ -9,7 +9,7 @@ const net = require("net"); const dgram = require("dgram"); // for UDP servers const socket = dgram.createSocket("udp4"); // Our universal UDP socket, this might cause issues and we may have to use a seperate socket for each connection -var start = process.hrtime(); +const start = process.hrtime(); const log = (msg, thread) => { console.log(`[${process.hrtime(start)[1] / 1000000}${(thread)?`:${thread}`:""}]\t ${msg}`); }; @@ -21,6 +21,32 @@ const queue = []; if (isMainThread) { const { v4: uuidv4 } = require("uuid"); + let cpuLoad; + + const getAverage = () => { + const cpus = os.cpus(); + let idle = 0; + let tick = 0; + for (const cpu of cpus) { + for (const type in cpu.times) { + tick += cpu.times[type]; + } + idle += cpu.times.idle; + } + return { + idle: idle / cpus.length, + tick: tick / cpus.length + }; + }; + + let measure = getAverage(); + setInterval(() => { + const newMeasure = getAverage(); + const idleDiff = newMeasure.idle - measure.idle; + const tickDiff = newMeasure.tick - measure.tick; + cpuLoad = 100 - ~~(100 * idleDiff / tickDiff); + measure = newMeasure; + }, 5000); const MAX_WORKERS = process.env.WORKERS === "" ? parseInt(process.env.WORKERS) : os.cpus().length * 4; // Completely arbitrary, should usually be some multiple of your amount of cores let workingWorkers = 0; @@ -73,6 +99,7 @@ if (isMainThread) { const req = msg.toString().slice(1,msg.length); // 0x0 == Cancel job // 0x1 == Queue job + // 0x2 == Get CPU usage if (opcode == 0x0) { queue.shift(); delete jobs[req]; @@ -88,6 +115,8 @@ if (isMainThread) { const newBuffer = Buffer.concat([Buffer.from([0x0]), Buffer.from(uuid)]); socket.send(newBuffer, rinfo.port, rinfo.address); + } else if (opcode == 0x2) { + socket.send(Buffer.concat([Buffer.from([0x3]), Buffer.from(cpuLoad.toString())]), rinfo.port, rinfo.address); } else { log("Could not parse message"); } diff --git a/package.json b/package.json index 53abb4c..4ccbc09 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ }, "scripts": { "build": "node-gyp configure build", - "docker:build-api": "docker build -t esmbot-api --no-cache -f Dockerfile.api .", + "docker:build-api": "docker build -t esmbot-api -f Dockerfile.api .", "docker:build-ss": "docker build -t headless-chrome-alpine --no-cache ./utils/screenshot", "docker:run-api": "docker run --rm --network=host esmbot-api", "docker:run-ss": "docker run --rm --network=host --shm-size=128m headless-chrome-alpine", diff --git a/utils/image.js b/utils/image.js index 4ae726f..d932231 100644 --- a/utils/image.js +++ b/utils/image.js @@ -11,6 +11,57 @@ const servers = require("../servers.json").image; const formats = ["image/jpeg", "image/png", "image/webp", "image/gif"]; +const chooseServer = async (ideal) => { + if (ideal.length === 0) throw "No available servers"; + const sorted = ideal.sort((a, b) => { + return a.load - b.load; + }); + return sorted[0]; +}; + +const getIdeal = () => { + return new Promise((resolve, reject) => { + const socket = dgram.createSocket("udp4"); + let serversLeft = servers.length; + const idealServers = []; + const timeout = setTimeout(() => { + socket.close(async () => { + try { + const server = await chooseServer(idealServers); + resolve(server); + } catch (e) { + reject(e); + } + }); + }, 5000); + socket.on("message", async (msg, rinfo) => { + const opcode = msg.readUint8(0); + const res = parseInt(msg.slice(1, msg.length).toString()); + if (opcode === 0x3) { + serversLeft--; + idealServers.push({ + addr: rinfo.address, + load: res + }); + if (!serversLeft) { + clearTimeout(timeout); + try { + const server = await chooseServer(idealServers); + resolve(server); + } catch (e) { + reject(e); + } + } + } + }); + for (const server of servers) { + socket.send(Buffer.from([0x2]), 8080, server, (err) => { + if (err) reject(err); + }); + } + }); +}; + const getFormat = (buffer, delimiter) => { for (var i = 0; i < buffer.length ; i++) { if (String.fromCharCode(buffer[i]) === delimiter) { @@ -61,7 +112,7 @@ exports.getType = async (image) => { exports.run = (object, fromAPI = false) => { return new Promise(async (resolve, reject) => { if (process.env.API === "true" && !fromAPI) { - const currentServer = servers[Math.floor(Math.random() * servers.length)]; + const currentServer = await getIdeal(); const socket = dgram.createSocket("udp4"); const data = Buffer.concat([Buffer.from([0x1]), Buffer.from(JSON.stringify(object))]); @@ -79,7 +130,7 @@ exports.run = (object, fromAPI = false) => { jobID = uuid; } else if (opcode === 0x1) { if (jobID === uuid) { - const client = net.createConnection(req.toString(), currentServer); + const client = net.createConnection(req.toString(), currentServer.addr); const array = []; client.on("data", (rawData) => { array.push(rawData); @@ -95,7 +146,7 @@ exports.run = (object, fromAPI = false) => { resolve(payload); }); client.on("error", (err) => { - throw err; + reject(err); }); } } else if (opcode === 0x2) { @@ -103,7 +154,7 @@ exports.run = (object, fromAPI = false) => { } }); - socket.send(data, 8080, currentServer, (err) => { + socket.send(data, 8080, currentServer.addr, (err) => { if (err) reject(err); }); } else if (isMainThread && !fromAPI) {