Add initial Azure Functions support, clean up gitignore

This commit is contained in:
Essem 2022-04-17 10:40:56 -05:00
parent dd7bd6b4cc
commit 6bf0537c29
No known key found for this signature in database
GPG key ID: 7D497397CC3A2A8C
29 changed files with 267 additions and 65 deletions

View file

@ -1,10 +1,13 @@
import { createRequire } from "module";
import { isMainThread, parentPort, workerData } from "worker_threads";
import fetch from "node-fetch";
import path from "path";
import { fileURLToPath } from "url";
const nodeRequire = createRequire(import.meta.url);
const magick = nodeRequire(`../build/${process.env.DEBUG && process.env.DEBUG === "true" ? "Debug" : "Release"}/image.node`);
const relPath = `../build/${process.env.DEBUG && process.env.DEBUG === "true" ? "Debug" : "Release"}/image.node`;
const magick = nodeRequire(relPath);
const enumMap = {
"forget": 0,
@ -42,6 +45,7 @@ export default function run(object) {
objectWithFixedType.gravity = enumMap[objectWithFixedType.gravity];
}
}
objectWithFixedType.basePath = path.join(path.dirname(fileURLToPath(import.meta.url)), "../");
try {
const result = magick[object.cmd](objectWithFixedType);
const returnObject = {

View file

@ -5,6 +5,9 @@ import path from "path";
import { fileURLToPath } from "url";
import { Worker } from "worker_threads";
import { createRequire } from "module";
import { createServer } from "http";
import fetch from "node-fetch";
import EventEmitter from "events";
// only requiring this to work around an issue regarding worker threads
const nodeRequire = createRequire(import.meta.url);
@ -18,11 +21,14 @@ class ImageWorker extends BaseServiceWorker {
console.info = (str) => this.ipc.sendToAdmiral("info", str);
if (process.env.API === "true") {
this.jobs = {};
if (process.env.API_TYPE === "ws") {
this.connections = new Map();
this.servers = JSON.parse(fs.readFileSync(new URL("../../servers.json", import.meta.url), { encoding: "utf8" })).image;
this.nextID = 0;
} else if (process.env.API_TYPE === "azure") {
this.jobs = new Map();
this.webhook = createServer();
this.port = parseInt(process.env.WEBHOOK_PORT) || 3763;
}
this.begin().then(() => this.serviceReady());
@ -30,7 +36,7 @@ class ImageWorker extends BaseServiceWorker {
async begin() {
// connect to image api if enabled
if (process.env.API === "true") {
if (process.env.API_TYPE === "ws") {
for (const server of this.servers) {
try {
await this.connect(server.server, server.auth);
@ -38,6 +44,73 @@ class ImageWorker extends BaseServiceWorker {
logger.error(e);
}
}
} else if (process.env.API_TYPE === "azure") {
this.webhook.on("request", async (req, res) => {
if (req.method !== "POST") {
res.statusCode = 405;
return res.end("405 Method Not Allowed");
}
if (process.env.AZURE_PASS && req.headers.authorization !== process.env.AZURE_PASS) {
res.statusCode = 401;
return res.end("401 Unauthorized");
}
const reqUrl = new URL(req.url, `http://${req.headers.host}`);
if (reqUrl.pathname === "/callback") {
try {
const chunks = [];
req.on("data", (data) => {
chunks.push(data);
});
req.once("end", () => {
if (this.jobs.has(req.headers["x-azure-id"])) {
try {
const error = JSON.parse(Buffer.concat(chunks).toString());
if (error.error) this.jobs.get(req.headers["x-azure-id"]).emit("error", new Error(error.message));
} catch {
// no-op
}
const contentType = req.headers["content-type"];
let type;
switch (contentType) {
case "image/gif":
type = "gif";
break;
case "image/png":
type = "png";
break;
case "image/jpeg":
type = "jpg";
break;
case "image/webp":
type = "webp";
break;
default:
type = contentType;
break;
}
this.jobs.get(req.headers["x-azure-id"]).emit("image", { buffer: Buffer.concat(chunks), type });
return res.end("OK");
} else {
res.statusCode = 409;
return res.end("409 Conflict");
}
});
} catch (e) {
logger.error("An error occurred while processing a webhook request: ", e);
res.statusCode = 500;
return res.end("500 Internal Server Error");
}
} else {
res.statusCode = 404;
return res.end("404 Not Found");
}
});
this.webhook.on("error", (e) => {
logger.error("An error occurred on the Azure webhook: ", e);
});
this.webhook.listen(this.port, () => {
logger.log(`Azure HTTP webhook listening on port ${this.port}`);
});
}
}
@ -49,7 +122,7 @@ class ImageWorker extends BaseServiceWorker {
async getRunning() {
const statuses = [];
if (process.env.API === "true") {
if (process.env.API_TYPE === "ws") {
for (const [address, connection] of this.connections) {
if (connection.conn.readyState !== 0 && connection.conn.readyState !== 1) {
continue;
@ -115,8 +188,17 @@ class ImageWorker extends BaseServiceWorker {
});
}
waitForAzure(event) {
return new Promise((resolve, reject) => {
event.once("image", (data) => {
resolve(data);
});
event.once("error", reject);
});
}
async run(object) {
if (process.env.API === "true") {
if (process.env.API_TYPE === "ws") {
let num = this.nextID++;
if (num > 4294967295) num = this.nextID = 0;
for (let i = 0; i < 3; i++) {
@ -135,6 +217,12 @@ class ImageWorker extends BaseServiceWorker {
}
}
}
} else if (process.env.API_TYPE === "azure") {
object.callback = `${process.env.AZURE_CALLBACK_URL}:${this.port}/callback`;
const response = await fetch(`${process.env.AZURE_URL}/api/orchestrators/ImageOrchestrator`, { method: "POST", body: JSON.stringify(object) }).then(r => r.json());
const event = new EventEmitter();
this.jobs.set(response.id, event);
return await this.waitForAzure(event);
} else {
// Called from command (not using image API)
const worker = new Worker(path.join(path.dirname(fileURLToPath(import.meta.url)), "../image-runner.js"), {

View file

@ -22,7 +22,7 @@ class PrometheusWorker extends BaseServiceWorker {
# HELP esmbot_shard_count Number of shards the bot has
# TYPE esmbot_shard_count gauge
`);
if (process.env.API === "true") {
if (process.env.API_TYPE === "ws") {
const servers = await this.ipc.serviceCommand("image", { type: "stats" }, true);
res.write(`# HELP esmbot_connected_workers Number of workers connected
# TYPE esmbot_connected_workers gauge