Made image API node chooser sort by job amount instead of CPU usage, added soundreload
This commit is contained in:
		
							parent
							
								
									55da0db479
								
							
						
					
					
						commit
						c2bd68263b
					
				
					 5 changed files with 70 additions and 73 deletions
				
			
		
							
								
								
									
										57
									
								
								api/index.js
									
										
									
									
									
								
							
							
						
						
									
										57
									
								
								api/index.js
									
										
									
									
									
								
							| 
						 | 
					@ -9,8 +9,8 @@ const dgram = require("dgram"); // for UDP servers
 | 
				
			||||||
const socket = dgram.createSocket("udp4"); // Our universal UDP socket, this might cause issues and we may have to use a seperate socket for each connection
 | 
					const socket = dgram.createSocket("udp4"); // Our universal UDP socket, this might cause issues and we may have to use a seperate socket for each connection
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const start = process.hrtime();
 | 
					const start = process.hrtime();
 | 
				
			||||||
const log = (msg) => {
 | 
					const log = (msg, jobNum) => {
 | 
				
			||||||
  console.log(`[${process.hrtime(start)[1] / 1000000}]\t ${msg}`);
 | 
					  console.log(`[${process.hrtime(start)[1] / 1000000}${jobNum !== undefined ? `:${jobNum}` : ""}]\t ${msg}`);
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const jobs = {};
 | 
					const jobs = {};
 | 
				
			||||||
| 
						 | 
					@ -19,32 +19,6 @@ const queue = [];
 | 
				
			||||||
// Array of UUIDs
 | 
					// Array of UUIDs
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const { v4: uuidv4 } = require("uuid");
 | 
					const { v4: uuidv4 } = require("uuid");
 | 
				
			||||||
let cpuLoad = 0;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
const getAverage = () => {
 | 
					 | 
				
			||||||
  const cpus = os.cpus();
 | 
					 | 
				
			||||||
  let idle = 0;
 | 
					 | 
				
			||||||
  let tick = 0;
 | 
					 | 
				
			||||||
  for (const cpu of cpus) {
 | 
					 | 
				
			||||||
    for (const type in cpu.times) {
 | 
					 | 
				
			||||||
      tick += cpu.times[type];
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    idle += cpu.times.idle;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
  return {
 | 
					 | 
				
			||||||
    idle: idle / cpus.length,
 | 
					 | 
				
			||||||
    tick: tick / cpus.length
 | 
					 | 
				
			||||||
  };
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
let measure = getAverage();
 | 
					 | 
				
			||||||
setInterval(() => {
 | 
					 | 
				
			||||||
  const newMeasure = getAverage();
 | 
					 | 
				
			||||||
  const idleDiff = newMeasure.idle - measure.idle;
 | 
					 | 
				
			||||||
  const tickDiff = newMeasure.tick - measure.tick;
 | 
					 | 
				
			||||||
  cpuLoad = 100 - ~~(100 * idleDiff / tickDiff);
 | 
					 | 
				
			||||||
  measure = newMeasure;
 | 
					 | 
				
			||||||
}, 5000);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
const MAX_JOBS = process.env.JOBS !== "" && process.env.JOBS !== undefined ? parseInt(process.env.JOBS) : os.cpus().length * 4; // Completely arbitrary, should usually be some multiple of your amount of cores
 | 
					const MAX_JOBS = process.env.JOBS !== "" && process.env.JOBS !== undefined ? parseInt(process.env.JOBS) : os.cpus().length * 4; // Completely arbitrary, should usually be some multiple of your amount of cores
 | 
				
			||||||
let jobAmount = 0;
 | 
					let jobAmount = 0;
 | 
				
			||||||
| 
						 | 
					@ -57,7 +31,8 @@ const acceptJob = async (uuid) => {
 | 
				
			||||||
      uuid: uuid,
 | 
					      uuid: uuid,
 | 
				
			||||||
      msg: jobs[uuid].msg,
 | 
					      msg: jobs[uuid].msg,
 | 
				
			||||||
      addr: jobs[uuid].addr,
 | 
					      addr: jobs[uuid].addr,
 | 
				
			||||||
      port: jobs[uuid].port
 | 
					      port: jobs[uuid].port,
 | 
				
			||||||
 | 
					      num: jobs[uuid].num
 | 
				
			||||||
    });
 | 
					    });
 | 
				
			||||||
    jobAmount--;
 | 
					    jobAmount--;
 | 
				
			||||||
    if (queue.length > 0) {
 | 
					    if (queue.length > 0) {
 | 
				
			||||||
| 
						 | 
					@ -87,22 +62,22 @@ server.on("message", (msg, rinfo) => {
 | 
				
			||||||
    delete queue[queue.indexOf(req) - 1];
 | 
					    delete queue[queue.indexOf(req) - 1];
 | 
				
			||||||
    delete jobs[req];
 | 
					    delete jobs[req];
 | 
				
			||||||
  } else if (opcode == 0x1) {
 | 
					  } else if (opcode == 0x1) {
 | 
				
			||||||
    const job = { addr: rinfo.address, port: rinfo.port, msg: req };
 | 
					    const job = { addr: rinfo.address, port: rinfo.port, msg: req, num: jobAmount };
 | 
				
			||||||
    const uuid = uuidv4();
 | 
					    const uuid = uuidv4();
 | 
				
			||||||
    jobs[uuid] = job;
 | 
					    jobs[uuid] = job;
 | 
				
			||||||
    queue.push(uuid);
 | 
					    queue.push(uuid);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (jobAmount < MAX_JOBS) {
 | 
					    if (jobAmount < MAX_JOBS) {
 | 
				
			||||||
      log(`Got request for job ${job.msg} with id ${uuid}`, job.threadNum);
 | 
					      log(`Got request for job ${job.msg} with id ${uuid}`, job.num);
 | 
				
			||||||
      acceptJob(uuid);
 | 
					      acceptJob(uuid);
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      log(`Got request for job ${job.msg} with id ${uuid}, queued in position ${queue.indexOf(uuid)}`, job.threadNum);
 | 
					      log(`Got request for job ${job.msg} with id ${uuid}, queued in position ${queue.indexOf(uuid)}`, job.num);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    const newBuffer = Buffer.concat([Buffer.from([0x0]), Buffer.from(uuid)]);
 | 
					    const newBuffer = Buffer.concat([Buffer.from([0x0]), Buffer.from(uuid)]);
 | 
				
			||||||
    socket.send(newBuffer, rinfo.port, rinfo.address);
 | 
					    socket.send(newBuffer, rinfo.port, rinfo.address);
 | 
				
			||||||
  } else if (opcode == 0x2) {
 | 
					  } else if (opcode == 0x2) {
 | 
				
			||||||
    socket.send(Buffer.concat([Buffer.from([0x3]), Buffer.from(cpuLoad.toString())]), rinfo.port, rinfo.address);
 | 
					    socket.send(Buffer.concat([Buffer.from([0x3]), Buffer.from((MAX_JOBS - jobAmount).toString())]), rinfo.port, rinfo.address);
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    log("Could not parse message");
 | 
					    log("Could not parse message");
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
| 
						 | 
					@ -115,8 +90,9 @@ server.on("listening", () => {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
server.bind(8080); // ATTENTION: Always going to be bound to 0.0.0.0 !!!
 | 
					server.bind(8080); // ATTENTION: Always going to be bound to 0.0.0.0 !!!
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const runJob = async (job) => {
 | 
					const runJob = (job) => {
 | 
				
			||||||
  log(`Job ${job.uuid} started`, job.threadNum);
 | 
					  return new Promise(async (resolve, reject) => {
 | 
				
			||||||
 | 
					    log(`Job ${job.uuid} starting...`, job.num);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    const object = JSON.parse(job.msg);
 | 
					    const object = JSON.parse(job.msg);
 | 
				
			||||||
    let type;
 | 
					    let type;
 | 
				
			||||||
| 
						 | 
					@ -126,10 +102,10 @@ const runJob = async (job) => {
 | 
				
			||||||
        type = await magick.getType(object.path);
 | 
					        type = await magick.getType(object.path);
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      if (!type) {
 | 
					      if (!type) {
 | 
				
			||||||
      throw new TypeError("Unknown image type");
 | 
					        reject(new TypeError("Unknown image type"));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      object.type = type.split("/")[1];
 | 
					      object.type = type.split("/")[1];
 | 
				
			||||||
    if (object.type !== "gif" && object.onlyGIF) throw new TypeError(`Expected a GIF, got ${object.type}`);
 | 
					      if (object.type !== "gif" && object.onlyGIF) reject(new TypeError(`Expected a GIF, got ${object.type}`));
 | 
				
			||||||
      object.delay = object.delay ? object.delay : 0;
 | 
					      object.delay = object.delay ? object.delay : 0;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
      
 | 
					      
 | 
				
			||||||
| 
						 | 
					@ -138,14 +114,16 @@ const runJob = async (job) => {
 | 
				
			||||||
      object.delay = (100 / delay.split("/")[0]) * delay.split("/")[1];
 | 
					      object.delay = (100 / delay.split("/")[0]) * delay.split("/")[1];
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    log(`Job ${job.uuid} started`, job.num);
 | 
				
			||||||
    const data = await magick.run(object, true);
 | 
					    const data = await magick.run(object, true);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  log(`Sending result of job ${job.uuid} back to the bot`, job.threadNum);
 | 
					    log(`Sending result of job ${job.uuid} back to the bot`, job.num);
 | 
				
			||||||
    const server = net.createServer(function(tcpSocket) {
 | 
					    const server = net.createServer(function(tcpSocket) {
 | 
				
			||||||
      tcpSocket.write(Buffer.concat([Buffer.from(type ? type : "image/png"), Buffer.from("\n"), data]), (err) => {
 | 
					      tcpSocket.write(Buffer.concat([Buffer.from(type ? type : "image/png"), Buffer.from("\n"), data]), (err) => {
 | 
				
			||||||
        if (err) console.error(err);
 | 
					        if (err) console.error(err);
 | 
				
			||||||
        tcpSocket.end(() => {
 | 
					        tcpSocket.end(() => {
 | 
				
			||||||
          server.close();
 | 
					          server.close();
 | 
				
			||||||
 | 
					          resolve(null);
 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
      });
 | 
					      });
 | 
				
			||||||
    });
 | 
					    });
 | 
				
			||||||
| 
						 | 
					@ -153,7 +131,7 @@ const runJob = async (job) => {
 | 
				
			||||||
    // handle address in use errors
 | 
					    // handle address in use errors
 | 
				
			||||||
    server.on("error", (e) => {
 | 
					    server.on("error", (e) => {
 | 
				
			||||||
      if (e.code === "EADDRINUSE") {
 | 
					      if (e.code === "EADDRINUSE") {
 | 
				
			||||||
      log("Address in use, retrying...", job.threadNum);
 | 
					        log("Address in use, retrying...", job.num);
 | 
				
			||||||
        setTimeout(() => {
 | 
					        setTimeout(() => {
 | 
				
			||||||
          server.close();
 | 
					          server.close();
 | 
				
			||||||
          server.listen(job.port, job.addr);
 | 
					          server.listen(job.port, job.addr);
 | 
				
			||||||
| 
						 | 
					@ -161,4 +139,5 @@ const runJob = async (job) => {
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    });
 | 
					    });
 | 
				
			||||||
    socket.send(Buffer.concat([Buffer.from([0x1]), Buffer.from(job.uuid), Buffer.from(job.port.toString())]), job.port, job.addr);
 | 
					    socket.send(Buffer.concat([Buffer.from([0x1]), Buffer.from(job.uuid), Buffer.from(job.port.toString())]), job.port, job.addr);
 | 
				
			||||||
 | 
					  });
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
							
								
								
									
										16
									
								
								commands/soundreload.js
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										16
									
								
								commands/soundreload.js
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
					@ -0,0 +1,16 @@
 | 
				
			||||||
 | 
					const soundPlayer = require("../utils/soundplayer.js");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					exports.run = async (message) => {
 | 
				
			||||||
 | 
					  if (message.author.id !== process.env.OWNER) return `${message.author.mention}, only the bot owner can reload Lavalink!`;
 | 
				
			||||||
 | 
					  const soundStatus = await soundPlayer.checkStatus();
 | 
				
			||||||
 | 
					  if (!soundStatus) {
 | 
				
			||||||
 | 
					    const length = await soundPlayer.connect();
 | 
				
			||||||
 | 
					    return `Successfully connected to ${length} Lavalink node(s).`;
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    return `${message.author.mention}, I couldn't connect to any Lavalink nodes!`;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					exports.aliases = ["lava", "lavalink", "lavaconnect", "soundconnect"];
 | 
				
			||||||
 | 
					exports.category = 8;
 | 
				
			||||||
 | 
					exports.help = "Attempts to reconnect to all available Lavalink nodes";
 | 
				
			||||||
| 
						 | 
					@ -60,7 +60,8 @@ exports.getCounts = async () => {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
exports.addCount = async (command) => {
 | 
					exports.addCount = async (command) => {
 | 
				
			||||||
  const count = await connection.query("SELECT * FROM counts WHERE command = $1", [command]);
 | 
					  const count = await connection.query("SELECT * FROM counts WHERE command = $1", [command]);
 | 
				
			||||||
  await connection.query("UPDATE counts SET count = $1 WHERE command = $2", [count.rows[0].count + 1, command]);
 | 
					  if (count.rows[0].count) await connection.query("INSERT INTO counts (command, count) VALUES ($1, $2)", [command, 0]);
 | 
				
			||||||
 | 
					  await connection.query("UPDATE counts SET count = $1 WHERE command = $2", [count.rows[0].count ? count.rows[0].count + 1 : 1, command]);
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
exports.addGuild = async (guild) => {
 | 
					exports.addGuild = async (guild) => {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -14,7 +14,7 @@ const formats = ["image/jpeg", "image/png", "image/webp", "image/gif"];
 | 
				
			||||||
const chooseServer = async (ideal) => {
 | 
					const chooseServer = async (ideal) => {
 | 
				
			||||||
  if (ideal.length === 0) throw "No available servers";
 | 
					  if (ideal.length === 0) throw "No available servers";
 | 
				
			||||||
  const sorted = ideal.sort((a, b) => {
 | 
					  const sorted = ideal.sort((a, b) => {
 | 
				
			||||||
    return a.load - b.load;
 | 
					    return b.load - a.load;
 | 
				
			||||||
  });
 | 
					  });
 | 
				
			||||||
  return sorted[0];
 | 
					  return sorted[0];
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -44,6 +44,7 @@ exports.connect = async () => {
 | 
				
			||||||
  this.manager.on("error", (error, node) => {
 | 
					  this.manager.on("error", (error, node) => {
 | 
				
			||||||
    logger.error(`An error occurred on Lavalink node ${node}: ${error}`);
 | 
					    logger.error(`An error occurred on Lavalink node ${node}: ${error}`);
 | 
				
			||||||
  });
 | 
					  });
 | 
				
			||||||
 | 
					  return length;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
exports.play = async (sound, message, music = false) => {
 | 
					exports.play = async (sound, message, music = false) => {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue