A little bit more image work
This commit is contained in:
		
							parent
							
								
									a0a32c31b9
								
							
						
					
					
						commit
						4d3e1655e0
					
				
					 5 changed files with 123 additions and 145 deletions
				
			
		| 
						 | 
					@ -169,7 +169,7 @@ httpServer.on("error", (e) => {
 | 
				
			||||||
});
 | 
					});
 | 
				
			||||||
 | 
					
 | 
				
			||||||
httpServer.listen(8080, () => {
 | 
					httpServer.listen(8080, () => {
 | 
				
			||||||
  log("HTTP listening on port 8080");
 | 
					  log("HTTP and WS listening on port 8080");
 | 
				
			||||||
});
 | 
					});
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const runJob = (job, sock) => {
 | 
					const runJob = (job, sock) => {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -10,11 +10,11 @@ class ImageStatsCommand extends Command {
 | 
				
			||||||
          "icon_url": this.client.user.avatarURL
 | 
					          "icon_url": this.client.user.avatarURL
 | 
				
			||||||
        },
 | 
					        },
 | 
				
			||||||
        "color": 16711680,
 | 
					        "color": 16711680,
 | 
				
			||||||
        "description": `The bot is currently connected to ${image.connections.length} image server(s).`,
 | 
					        "description": `The bot is currently connected to ${image.connections.size} image server(s).`,
 | 
				
			||||||
        "fields": []
 | 
					        "fields": []
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
    const servers = await image.getStatus();
 | 
					    const servers = await image.getRunning();
 | 
				
			||||||
    for (let i = 0; i < servers.length; i++) {
 | 
					    for (let i = 0; i < servers.length; i++) {
 | 
				
			||||||
      embed.embed.fields.push({
 | 
					      embed.embed.fields.push({
 | 
				
			||||||
        name: `Server ${i + 1}`,
 | 
					        name: `Server ${i + 1}`,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										4
									
								
								shard.js
									
										
									
									
									
								
							
							
						
						
									
										4
									
								
								shard.js
									
										
									
									
									
								
							| 
						 | 
					@ -79,7 +79,7 @@ class Shard extends Base {
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        res.write(`# HELP connected_workers Number of workers connected
 | 
					        res.write(`# HELP connected_workers Number of workers connected
 | 
				
			||||||
# TYPE connected_workers gauge
 | 
					# TYPE connected_workers gauge
 | 
				
			||||||
connected_workers ${image.connections.length}
 | 
					connected_workers ${image.connections.size}
 | 
				
			||||||
# HELP running_jobs Number of running jobs on this worker
 | 
					# HELP running_jobs Number of running jobs on this worker
 | 
				
			||||||
# TYPE running_jobs gauge
 | 
					# TYPE running_jobs gauge
 | 
				
			||||||
# HELP queued_jobs Number of queued jobs on this worker
 | 
					# HELP queued_jobs Number of queued jobs on this worker
 | 
				
			||||||
| 
						 | 
					@ -89,7 +89,7 @@ connected_workers ${image.connections.length}
 | 
				
			||||||
# HELP command_count Number of times a command has been run
 | 
					# HELP command_count Number of times a command has been run
 | 
				
			||||||
# TYPE command_count counter
 | 
					# TYPE command_count counter
 | 
				
			||||||
`);
 | 
					`);
 | 
				
			||||||
        const servers = await image.getStatus();
 | 
					        const servers = await image.getRunning();
 | 
				
			||||||
        for (const [i, w] of servers.entries()) {
 | 
					        for (const [i, w] of servers.entries()) {
 | 
				
			||||||
          res.write(`running_jobs{worker="${i}"} ${w.runningJobs}\n`);
 | 
					          res.write(`running_jobs{worker="${i}"} ${w.runningJobs}\n`);
 | 
				
			||||||
          res.write(`queued_jobs{worker="${i}"} ${w.queued}\n`);
 | 
					          res.write(`queued_jobs{worker="${i}"} ${w.queued}\n`);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										256
									
								
								utils/image.js
									
										
									
									
									
								
							
							
						
						
									
										256
									
								
								utils/image.js
									
										
									
									
									
								
							| 
						 | 
					@ -30,90 +30,80 @@ exports.repopulate = async () => {
 | 
				
			||||||
  return;
 | 
					  return;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
exports.getStatus = () => {
 | 
					exports.getRunning = async () => {
 | 
				
			||||||
  return new Promise((resolve, reject) => {
 | 
					  let serversLeft = this.connections.size;
 | 
				
			||||||
    let serversLeft = this.connections.size;
 | 
					  const statuses = [];
 | 
				
			||||||
    const statuses = [];
 | 
					  for (const address of this.connections.keys()) {
 | 
				
			||||||
 | 
					    const connection = this.connections.get(address);
 | 
				
			||||||
 | 
					    if (connection.readyState !== 0 && connection.readyState !== 1) {
 | 
				
			||||||
 | 
					      serversLeft--;
 | 
				
			||||||
 | 
					      continue;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    const controller = new AbortController(); // eslint-disable-line no-undef
 | 
				
			||||||
    const timeout = setTimeout(() => {
 | 
					    const timeout = setTimeout(() => {
 | 
				
			||||||
      resolve(statuses);
 | 
					      controller.abort();
 | 
				
			||||||
    }, 5000);
 | 
					    }, 2000);
 | 
				
			||||||
    for (const address of this.connections.keys()) {
 | 
					    try {
 | 
				
			||||||
      const connection = this.connections.get(address);
 | 
					      const statusRequest = await fetch(`http://${address}:8080/running`, { signal: controller.signal });
 | 
				
			||||||
      if (connection.readyState !== 1) {
 | 
					      clearTimeout(timeout);
 | 
				
			||||||
 | 
					      const status = await statusRequest.json();
 | 
				
			||||||
 | 
					      serversLeft--;
 | 
				
			||||||
 | 
					      statuses.push(status);
 | 
				
			||||||
 | 
					    } catch (e) {
 | 
				
			||||||
 | 
					      if (e.name === "AbortError") {
 | 
				
			||||||
 | 
					        serversLeft--;
 | 
				
			||||||
 | 
					        continue;
 | 
				
			||||||
 | 
					      } else if (e.code === "ECONNREFUSED") {
 | 
				
			||||||
        serversLeft--;
 | 
					        serversLeft--;
 | 
				
			||||||
        continue;
 | 
					        continue;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      fetch(`http://${address}:8080/running`).then(statusRequest => statusRequest.json()).then((status) => {
 | 
					      throw e;
 | 
				
			||||||
        serversLeft--;
 | 
					 | 
				
			||||||
        statuses.push(status);
 | 
					 | 
				
			||||||
        if (!serversLeft) {
 | 
					 | 
				
			||||||
          clearTimeout(timeout);
 | 
					 | 
				
			||||||
          resolve(statuses);
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        return;
 | 
					 | 
				
			||||||
      }).catch(e => {
 | 
					 | 
				
			||||||
        if (e.code === "ECONNREFUSED") {
 | 
					 | 
				
			||||||
          serversLeft--;
 | 
					 | 
				
			||||||
          return;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        reject(e);
 | 
					 | 
				
			||||||
      });
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    if (!serversLeft) {
 | 
					  }
 | 
				
			||||||
      clearTimeout(timeout);
 | 
					  if (!serversLeft) {
 | 
				
			||||||
      resolve(statuses);
 | 
					    return statuses;
 | 
				
			||||||
    }
 | 
					  } else {
 | 
				
			||||||
  });
 | 
					    throw new Error("Loop ended before all servers could be checked");
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
exports.connect = (server) => {
 | 
					exports.connect = (server) => {
 | 
				
			||||||
  return new Promise((resolve, reject) => {
 | 
					  const connection = new WebSocket(`ws://${server}:8080/sock`);
 | 
				
			||||||
    const connection = new WebSocket(`ws://${server}:8080/sock`);
 | 
					  connection.on("message", async (msg) => {
 | 
				
			||||||
    const timeout = setTimeout(() => {
 | 
					    const opcode = msg.readUint8(0);
 | 
				
			||||||
      this.connections.delete(server);
 | 
					    const req = msg.slice(37, msg.length);
 | 
				
			||||||
      reject(`Failed to connect to ${server}`);
 | 
					    const uuid = msg.slice(1, 37).toString();
 | 
				
			||||||
    }, 5000);
 | 
					    if (opcode === 0x00) { // Job queued
 | 
				
			||||||
    connection.once("open", () => {
 | 
					      if (this.jobs[req]) {
 | 
				
			||||||
      clearTimeout(timeout);
 | 
					        this.jobs[req].event.emit("uuid", uuid);
 | 
				
			||||||
    });
 | 
					      }
 | 
				
			||||||
    connection.on("message", async (msg) => {
 | 
					    } else if (opcode === 0x01) { // Job completed successfully
 | 
				
			||||||
      const opcode = msg.readUint8(0);
 | 
					      // the image API sends all job responses over the same socket; make sure this is ours
 | 
				
			||||||
      const req = msg.slice(37, msg.length);
 | 
					      if (this.jobs[uuid]) {
 | 
				
			||||||
      const uuid = msg.slice(1, 37).toString();
 | 
					        const imageReq = await fetch(`http://${server}:8080/image?id=${uuid}`);
 | 
				
			||||||
      if (opcode === 0x00) { // Job queued
 | 
					        const image = await imageReq.buffer();
 | 
				
			||||||
        if (this.jobs[req]) {
 | 
					        // The response data is given as the file extension/ImageMagick type of the image (e.g. "png"), followed
 | 
				
			||||||
          this.jobs[req].event.emit("uuid", uuid);
 | 
					        // by a newline, followed by the image data.
 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
      } else if (opcode === 0x01) { // Job completed successfully
 | 
					 | 
				
			||||||
        // the image API sends all job responses over the same socket; make sure this is ours
 | 
					 | 
				
			||||||
        if (this.jobs[uuid]) {
 | 
					 | 
				
			||||||
          const imageReq = await fetch(`http://${server}:8080/image?id=${uuid}`);
 | 
					 | 
				
			||||||
          const image = await imageReq.buffer();
 | 
					 | 
				
			||||||
          // The response data is given as the file extension/ImageMagick type of the image (e.g. "png"), followed
 | 
					 | 
				
			||||||
          // by a newline, followed by the image data.
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
          this.jobs[uuid].event.emit("image", image, imageReq.headers.get("ext"));
 | 
					        this.jobs[uuid].event.emit("image", image, imageReq.headers.get("ext"));
 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
      } else if (opcode === 0x02) { // Job errored
 | 
					 | 
				
			||||||
        if (this.jobs[uuid]) {
 | 
					 | 
				
			||||||
          this.jobs[uuid].event.emit("error", new Error(req));
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    });
 | 
					    } else if (opcode === 0x02) { // Job errored
 | 
				
			||||||
    connection.on("error", (e) => {
 | 
					      if (this.jobs[uuid]) {
 | 
				
			||||||
      logger.error(e.toString());
 | 
					        this.jobs[uuid].event.emit("error", new Error(req));
 | 
				
			||||||
    });
 | 
					 | 
				
			||||||
    connection.once("close", () => {
 | 
					 | 
				
			||||||
      for (const uuid of Object.keys(this.jobs)) {
 | 
					 | 
				
			||||||
        if (this.jobs[uuid].addr === server) this.jobs[uuid].event.emit("error", "Job ended prematurely due to a closed connection; please run your image job again");
 | 
					 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      logger.log(`Lost connection to ${server}, attempting to reconnect...`);
 | 
					    }
 | 
				
			||||||
      this.connections.delete(server);
 | 
					 | 
				
			||||||
      //this.connect(server);
 | 
					 | 
				
			||||||
    });
 | 
					 | 
				
			||||||
    this.connections.set(server, connection);
 | 
					 | 
				
			||||||
    resolve();
 | 
					 | 
				
			||||||
  });
 | 
					  });
 | 
				
			||||||
 | 
					  connection.on("error", (e) => {
 | 
				
			||||||
 | 
					    logger.error(e.toString());
 | 
				
			||||||
 | 
					  });
 | 
				
			||||||
 | 
					  connection.once("close", () => {
 | 
				
			||||||
 | 
					    for (const uuid of Object.keys(this.jobs)) {
 | 
				
			||||||
 | 
					      if (this.jobs[uuid].addr === server) this.jobs[uuid].event.emit("error", "Job ended prematurely due to a closed connection; please run your image job again");
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    //logger.log(`Lost connection to ${server}, attempting to reconnect...`);
 | 
				
			||||||
 | 
					    this.connections.delete(server);
 | 
				
			||||||
 | 
					  });
 | 
				
			||||||
 | 
					  this.connections.set(server, connection);
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
exports.disconnect = async () => {
 | 
					exports.disconnect = async () => {
 | 
				
			||||||
| 
						 | 
					@ -128,84 +118,72 @@ exports.disconnect = async () => {
 | 
				
			||||||
  return;
 | 
					  return;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const getIdeal = () => {
 | 
					const getIdeal = async () => {
 | 
				
			||||||
  return new Promise((resolve, reject) => {
 | 
					  let serversLeft = this.connections.size;
 | 
				
			||||||
    let serversLeft = this.connections.size;
 | 
					  if (serversLeft === 0) {
 | 
				
			||||||
    if (serversLeft === 0) {
 | 
					    for (const server of this.servers) {
 | 
				
			||||||
      for (const server of this.servers) {
 | 
					 | 
				
			||||||
        this.connect(server).catch(e => {
 | 
					 | 
				
			||||||
          logger.error(e);
 | 
					 | 
				
			||||||
        });
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
      serversLeft = this.connections.size;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    const idealServers = [];
 | 
					 | 
				
			||||||
    const timeout = setTimeout(async () => {
 | 
					 | 
				
			||||||
      try {
 | 
					      try {
 | 
				
			||||||
        const server = await chooseServer(idealServers);
 | 
					        await this.connect(server);
 | 
				
			||||||
        resolve({ addr: server.addr, sock: this.connections.get(server.addr) });
 | 
					 | 
				
			||||||
      } catch (e) {
 | 
					      } catch (e) {
 | 
				
			||||||
        reject(e);
 | 
					        logger.error(e);
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }, 5000);
 | 
					    }
 | 
				
			||||||
    for (const address of this.connections.keys()) {
 | 
					    serversLeft = this.connections.size;
 | 
				
			||||||
      const connection = this.connections.get(address);
 | 
					  }
 | 
				
			||||||
      if (connection.readyState !== 1) {
 | 
					  const idealServers = [];
 | 
				
			||||||
 | 
					  for (const address of this.connections.keys()) {
 | 
				
			||||||
 | 
					    const connection = this.connections.get(address);
 | 
				
			||||||
 | 
					    if (connection.readyState !== 0 && connection.readyState !== 1) {
 | 
				
			||||||
 | 
					      serversLeft--;
 | 
				
			||||||
 | 
					      continue;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    const controller = new AbortController(); // eslint-disable-line no-undef
 | 
				
			||||||
 | 
					    const timeout = setTimeout(() => {
 | 
				
			||||||
 | 
					      controller.abort();
 | 
				
			||||||
 | 
					    }, 2000);
 | 
				
			||||||
 | 
					    try {
 | 
				
			||||||
 | 
					      const statusRequest = await fetch(`http://${address}:8080/status`, { signal: controller.signal });
 | 
				
			||||||
 | 
					      clearTimeout(timeout);
 | 
				
			||||||
 | 
					      const status = await statusRequest.text();
 | 
				
			||||||
 | 
					      serversLeft--;
 | 
				
			||||||
 | 
					      idealServers.push({
 | 
				
			||||||
 | 
					        addr: address,
 | 
				
			||||||
 | 
					        load: parseInt(status)
 | 
				
			||||||
 | 
					      });
 | 
				
			||||||
 | 
					    } catch (e) {
 | 
				
			||||||
 | 
					      if (e.name === "AbortError") {
 | 
				
			||||||
 | 
					        serversLeft--;
 | 
				
			||||||
 | 
					        continue;
 | 
				
			||||||
 | 
					      } else if (e.code === "ECONNREFUSED") {
 | 
				
			||||||
        serversLeft--;
 | 
					        serversLeft--;
 | 
				
			||||||
        continue;
 | 
					        continue;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      fetch(`http://${address}:8080/status`).then(statusRequest => statusRequest.text()).then((status) => {
 | 
					      throw e;
 | 
				
			||||||
        serversLeft--;
 | 
					    } finally {
 | 
				
			||||||
        idealServers.push({
 | 
					 | 
				
			||||||
          addr: address,
 | 
					 | 
				
			||||||
          load: parseInt(status)
 | 
					 | 
				
			||||||
        });
 | 
					 | 
				
			||||||
        return;
 | 
					 | 
				
			||||||
      }).then(async () => {
 | 
					 | 
				
			||||||
        if (!serversLeft) {
 | 
					 | 
				
			||||||
          clearTimeout(timeout);
 | 
					 | 
				
			||||||
          const server = await chooseServer(idealServers);
 | 
					 | 
				
			||||||
          resolve({ addr: server.addr, sock: this.connections.get(server.addr) });
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
      }).catch(e => {
 | 
					 | 
				
			||||||
        if (e.code === "ECONNREFUSED") {
 | 
					 | 
				
			||||||
          serversLeft--;
 | 
					 | 
				
			||||||
          return;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        reject(e);
 | 
					 | 
				
			||||||
      });
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    if (!serversLeft) {
 | 
					 | 
				
			||||||
      clearTimeout(timeout);
 | 
					      clearTimeout(timeout);
 | 
				
			||||||
      chooseServer(idealServers).then(server => {
 | 
					 | 
				
			||||||
        resolve({ addr: server.addr, sock: this.connections.get(server.addr) });
 | 
					 | 
				
			||||||
      }).catch(e => reject(e));
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  });
 | 
					  }
 | 
				
			||||||
 | 
					  if (!serversLeft) {
 | 
				
			||||||
 | 
					    const server = await chooseServer(idealServers);
 | 
				
			||||||
 | 
					    return { addr: server.addr, sock: this.connections.get(server.addr) };
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    throw new Error("Loop ended before all servers could be checked");
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const start = (object, num) => {
 | 
					const start = async (object, num) => {
 | 
				
			||||||
  return getIdeal().then(async (currentServer) => {
 | 
					  const currentServer = await getIdeal();
 | 
				
			||||||
    const data = Buffer.concat([Buffer.from([0x01 /* queue job */]), Buffer.from(num.length.toString()), Buffer.from(num), Buffer.from(JSON.stringify(object))]);
 | 
					  const data = Buffer.concat([Buffer.from([0x01 /* queue job */]), Buffer.from(num.length.toString()), Buffer.from(num), Buffer.from(JSON.stringify(object))]);
 | 
				
			||||||
    return new Promise((resolve) => {
 | 
					  currentServer.sock.send(data);
 | 
				
			||||||
      currentServer.sock.send(data, () => {
 | 
					  const event = new EventEmitter();
 | 
				
			||||||
        resolve(currentServer.addr);
 | 
					  this.jobs[num] = { event, addr: currentServer.addr };
 | 
				
			||||||
      });
 | 
					  const uuid = await new Promise((resolve, reject) => {
 | 
				
			||||||
    });
 | 
					    event.once("uuid", (uuid) => resolve(uuid));
 | 
				
			||||||
  }).catch((result) => {
 | 
					    event.once("error", reject);
 | 
				
			||||||
    throw result;
 | 
					 | 
				
			||||||
  }).then((addr) => {
 | 
					 | 
				
			||||||
    const event = new EventEmitter();
 | 
					 | 
				
			||||||
    return new Promise((resolve, reject) => {
 | 
					 | 
				
			||||||
      event.once("uuid", (uuid) => resolve({ event, uuid, addr }));
 | 
					 | 
				
			||||||
      event.once("error", reject);
 | 
					 | 
				
			||||||
      this.jobs[num] = { event, addr };
 | 
					 | 
				
			||||||
    });
 | 
					 | 
				
			||||||
  }).then(data => {
 | 
					 | 
				
			||||||
    delete this.jobs[num];
 | 
					 | 
				
			||||||
    this.jobs[data.uuid] = { event: data.event, addr: data.addr };
 | 
					 | 
				
			||||||
    return { uuid: data.uuid, event: data.event };
 | 
					 | 
				
			||||||
  });
 | 
					  });
 | 
				
			||||||
 | 
					  delete this.jobs[num];
 | 
				
			||||||
 | 
					  this.jobs[uuid] = { event: event, addr: currentServer.addr };
 | 
				
			||||||
 | 
					  return { uuid: uuid, event: event };
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
exports.check = (cmd) => {
 | 
					exports.check = (cmd) => {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -7,7 +7,7 @@ module.exports = (input, allowed) => {
 | 
				
			||||||
    const a = input[i];
 | 
					    const a = input[i];
 | 
				
			||||||
    if (a.startsWith("--") && !curr && isAllowed(a.slice(2).split("=")[0], allowed)) {
 | 
					    if (a.startsWith("--") && !curr && isAllowed(a.slice(2).split("=")[0], allowed)) {
 | 
				
			||||||
      if (a.includes("=")) {
 | 
					      if (a.includes("=")) {
 | 
				
			||||||
        const [ arg, value ] = a.slice(2).split("=");
 | 
					        const [arg, value] = a.slice(2).split("=");
 | 
				
			||||||
        let ended = true;
 | 
					        let ended = true;
 | 
				
			||||||
        if (value.startsWith("\"")) {
 | 
					        if (value.startsWith("\"")) {
 | 
				
			||||||
          if (!value.endsWith("\"")) {
 | 
					          if (!value.endsWith("\"")) {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue