2021-08-19 14:19:14 +00:00
import { config } from "dotenv" ;
config ( ) ;
import { cpus } from "os" ;
import { Worker } from "worker_threads" ;
import { join } from "path" ;
import { createServer } from "http" ;
2021-11-27 05:05:09 +00:00
import { WebSocketServer } from "ws" ;
2021-08-19 14:19:14 +00:00
import { fileURLToPath } from "url" ;
import { dirname } from "path" ;
2021-11-27 05:05:09 +00:00
import { createRequire } from "module" ;
const nodeRequire = createRequire ( import . meta . url ) ;
const magick = nodeRequire ( ` ../build/ ${ process . env . DEBUG && process . env . DEBUG === "true" ? "Debug" : "Release" } /image.node ` ) ;
/ *
Rerror 0x01
Tqueue 0x02
Rqueue 0x03
Tcancel 0x04
Rcancel 0x05
Twait 0x06
Rwait 0x07
Rinit 0x08
* /
const Rerror = 0x01 ;
const Tqueue = 0x02 ;
const Rqueue = 0x03 ;
const Tcancel = 0x04 ;
const Rcancel = 0x05 ;
const Twait = 0x06 ;
const Rwait = 0x07 ;
const Rinit = 0x08 ;
2020-08-31 22:15:34 +00:00
2020-12-26 02:27:45 +00:00
const start = process . hrtime ( ) ;
2021-01-08 18:08:10 +00:00
const log = ( msg , jobNum ) => {
console . log ( ` [ ${ process . hrtime ( start ) [ 1 ] / 1000000 } ${ jobNum !== undefined ? ` : ${ jobNum } ` : "" } ] \t ${ msg } ` ) ;
2020-11-17 14:52:12 +00:00
} ;
2020-11-05 21:40:18 +00:00
2021-04-14 23:02:03 +00:00
class JobCache extends Map {
set ( key , value ) {
super . set ( key , value ) ;
setTimeout ( ( ) => {
2021-04-21 17:36:59 +00:00
if ( super . has ( key ) && this . get ( key ) === value && value . data ) super . delete ( key ) ;
2021-04-14 23:02:03 +00:00
} , 300000 ) ; // delete jobs if not requested after 5 minutes
}
}
const jobs = new JobCache ( ) ;
2021-11-27 05:05:09 +00:00
// Should look like ID : { msg: "request", num: <job number> }
2020-11-17 14:52:12 +00:00
const queue = [ ] ;
2021-11-27 05:05:09 +00:00
// Array of IDs
2021-01-06 22:10:31 +00:00
2021-08-19 14:19:14 +00:00
const MAX _JOBS = process . env . JOBS !== "" && process . env . JOBS !== undefined ? parseInt ( process . env . JOBS ) : cpus ( ) . length * 4 ; // Completely arbitrary, should usually be some multiple of your amount of cores
2021-08-06 22:13:29 +00:00
const PASS = process . env . PASS !== "" && process . env . PASS !== undefined ? process . env . PASS : undefined ;
2021-01-06 22:10:31 +00:00
let jobAmount = 0 ;
2021-11-27 05:05:09 +00:00
const acceptJob = ( id , sock ) => {
2021-01-06 22:10:31 +00:00
jobAmount ++ ;
queue . shift ( ) ;
2021-11-27 05:05:09 +00:00
const job = jobs . get ( id ) ;
2021-05-06 22:40:06 +00:00
return runJob ( {
2021-11-27 05:05:09 +00:00
id : id ,
2021-05-06 20:17:04 +00:00
msg : job . msg ,
num : job . num
} , sock ) . then ( ( ) => {
2021-11-27 05:05:09 +00:00
log ( ` Job ${ id } has finished ` ) ;
2021-05-06 20:17:04 +00:00
} ) . catch ( ( err ) => {
2021-11-27 05:05:09 +00:00
console . error ( ` Error on job ${ id } : ` , err ) ;
const newJob = jobs . get ( id ) ;
if ( ! newJob . tag ) {
newJob . error = err . message ;
jobs . set ( id , newJob ) ;
return ;
}
jobs . delete ( id ) ;
sock . send ( Buffer . concat ( [ Buffer . from ( [ Rerror ] ) , newJob . tag , Buffer . from ( err . message ) ] ) ) ;
2021-05-06 20:17:04 +00:00
} ) . finally ( ( ) => {
2021-01-06 22:10:31 +00:00
jobAmount -- ;
if ( queue . length > 0 ) {
2021-02-20 03:01:41 +00:00
acceptJob ( queue [ 0 ] , sock ) ;
2021-01-06 22:10:31 +00:00
}
2021-05-06 20:17:04 +00:00
} ) ;
2021-01-06 22:10:31 +00:00
} ;
2020-11-17 14:52:12 +00:00
2021-11-27 05:05:09 +00:00
const wss = new WebSocketServer ( { clientTracking : true , noServer : true } ) ;
2021-06-18 05:10:11 +00:00
wss . on ( "connection" , ( ws , request ) => {
log ( ` WS client ${ request . socket . remoteAddress } : ${ request . socket . remotePort } has connected ` ) ;
2021-11-27 05:24:13 +00:00
const num = Buffer . alloc ( 2 ) ;
num . writeUInt16LE ( MAX _JOBS ) ;
2021-11-29 21:27:13 +00:00
const formats = { } ;
for ( const cmd of Object . keys ( magick ) ) {
formats [ cmd ] = [ "image/png" , "image/gif" , "image/jpeg" , "image/webp" ] ;
}
const init = Buffer . concat ( [ Buffer . from ( [ Rinit ] ) , Buffer . from ( [ 0x00 , 0x00 ] ) , num , Buffer . from ( JSON . stringify ( formats ) ) ] ) ;
2021-11-27 05:24:13 +00:00
ws . send ( init ) ;
2021-11-27 05:05:09 +00:00
2021-06-18 05:10:11 +00:00
ws . on ( "error" , ( err ) => {
console . error ( err ) ;
} ) ;
ws . on ( "message" , ( msg ) => {
const opcode = msg . readUint8 ( 0 ) ;
2021-11-29 21:27:13 +00:00
const tag = msg . slice ( 1 , 3 ) ;
const req = msg . toString ( ) . slice ( 3 ) ;
2021-11-27 05:05:09 +00:00
if ( opcode == Tqueue ) {
2021-11-29 21:27:13 +00:00
const id = msg . readUInt32LE ( 3 ) ;
const obj = msg . slice ( 7 ) ;
2021-06-18 05:10:11 +00:00
const job = { msg : obj , num : jobAmount } ;
2021-11-27 05:05:09 +00:00
jobs . set ( id , job ) ;
queue . push ( id ) ;
2021-06-18 05:10:11 +00:00
2021-11-27 05:05:09 +00:00
const newBuffer = Buffer . concat ( [ Buffer . from ( [ Rqueue ] ) , tag ] ) ;
2021-06-18 05:10:11 +00:00
ws . send ( newBuffer ) ;
if ( jobAmount < MAX _JOBS ) {
2021-11-27 05:05:09 +00:00
log ( ` Got WS request for job ${ job . msg } with id ${ id } ` , job . num ) ;
acceptJob ( id , ws ) ;
2021-06-18 05:10:11 +00:00
} else {
2021-11-27 05:05:09 +00:00
log ( ` Got WS request for job ${ job . msg } with id ${ id } , queued in position ${ queue . indexOf ( id ) } ` , job . num ) ;
2021-06-18 05:10:11 +00:00
}
2021-11-27 05:05:09 +00:00
} else if ( opcode == Tcancel ) {
delete queue [ queue . indexOf ( req ) - 1 ] ;
jobs . delete ( req ) ;
const cancelResponse = Buffer . concat ( [ Buffer . from ( [ Rcancel ] ) , tag ] ) ;
ws . send ( cancelResponse ) ;
} else if ( opcode == Twait ) {
2021-11-29 21:27:13 +00:00
const id = msg . readUInt32LE ( 3 ) ;
2021-11-27 05:05:09 +00:00
const job = jobs . get ( id ) ;
if ( ! job ) {
const errorResponse = Buffer . concat ( [ Buffer . from ( [ Rerror ] ) , tag , Buffer . from ( "Invalid job ID" ) ] ) ;
ws . send ( errorResponse ) ;
return ;
}
if ( job . error ) {
jobs . delete ( id ) ;
const errorResponse = Buffer . concat ( [ Buffer . from ( [ Rerror ] ) , tag , Buffer . from ( job . error ) ] ) ;
ws . send ( errorResponse ) ;
return ;
}
job . tag = tag ;
jobs . set ( id , job ) ;
//const waitResponse = Buffer.concat([Buffer.from([Rwait]), tag]);
//ws.send(waitResponse);
2021-06-18 05:10:11 +00:00
} else {
log ( "Could not parse WS message" ) ;
}
} ) ;
ws . on ( "close" , ( ) => {
log ( ` WS client ${ request . socket . remoteAddress } : ${ request . socket . remotePort } has disconnected ` ) ;
} ) ;
} ) ;
wss . on ( "error" , ( err ) => {
console . error ( "A WS error occurred: " , err ) ;
} ) ;
2021-08-19 14:19:14 +00:00
const httpServer = createServer ( ) ;
2021-06-18 05:10:11 +00:00
httpServer . on ( "request" , async ( req , res ) => {
2021-06-29 22:49:13 +00:00
if ( req . method !== "GET" ) {
2021-01-18 20:11:28 +00:00
res . statusCode = 405 ;
return res . end ( "405 Method Not Allowed" ) ;
}
2021-08-06 22:13:29 +00:00
if ( PASS && req . headers . authentication !== PASS ) {
res . statusCode = 401 ;
return res . end ( "401 Unauthorized" ) ;
}
2021-01-18 20:11:28 +00:00
const reqUrl = new URL ( req . url , ` http:// ${ req . headers . host } ` ) ;
2021-11-27 05:05:09 +00:00
if ( reqUrl . pathname === "/image" && req . method === "GET" ) {
2021-01-18 20:11:28 +00:00
if ( ! reqUrl . searchParams . has ( "id" ) ) {
res . statusCode = 400 ;
return res . end ( "400 Bad Request" ) ;
2020-11-17 14:52:12 +00:00
}
2021-11-27 05:05:09 +00:00
const id = parseInt ( reqUrl . searchParams . get ( "id" ) ) ;
2021-04-14 23:02:03 +00:00
if ( ! jobs . has ( id ) ) {
2021-01-18 20:11:28 +00:00
res . statusCode = 410 ;
return res . end ( "410 Gone" ) ;
}
log ( ` Sending image data for job ${ id } to ${ req . socket . remoteAddress } : ${ req . socket . remotePort } via HTTP ` ) ;
2021-11-27 05:05:09 +00:00
const ext = jobs . get ( id ) . ext ;
let contentType ;
switch ( ext ) {
case "gif" :
contentType = "image/gif" ;
break ;
case "png" :
contentType = "image/png" ;
break ;
case "jpeg" :
case "jpg" :
contentType = "image/jpeg" ;
break ;
case "webp" :
contentType = "image/webp" ;
break ;
}
2021-12-03 00:01:33 +00:00
if ( contentType ) res . setHeader ( "Content-Type" , contentType ) ;
else res . setHeader ( "Content-Type" , ext ) ;
2021-05-02 17:00:41 +00:00
const data = jobs . get ( id ) . data ;
jobs . delete ( id ) ;
return res . end ( data , ( err ) => {
2021-01-18 20:11:28 +00:00
if ( err ) console . error ( err ) ;
} ) ;
2021-01-06 22:10:31 +00:00
} else {
2021-01-18 20:11:28 +00:00
res . statusCode = 404 ;
return res . end ( "404 Not Found" ) ;
2021-01-06 22:10:31 +00:00
}
} ) ;
2021-06-18 05:10:11 +00:00
httpServer . on ( "upgrade" , ( req , sock , head ) => {
const reqUrl = new URL ( req . url , ` http:// ${ req . headers . host } ` ) ;
2021-01-18 20:11:28 +00:00
2021-08-06 22:13:29 +00:00
if ( PASS && req . headers . authentication !== PASS ) {
sock . write ( "HTTP/1.1 401 Unauthorized\r\n\r\n" ) ;
sock . destroy ( ) ;
return ;
}
2021-06-18 05:10:11 +00:00
if ( reqUrl . pathname === "/sock" ) {
wss . handleUpgrade ( req , sock , head , ( ws ) => {
wss . emit ( "connection" , ws , req ) ;
} ) ;
} else {
sock . destroy ( ) ;
}
2021-01-18 20:11:28 +00:00
} ) ;
2021-06-18 05:10:11 +00:00
httpServer . on ( "error" , ( e ) => {
console . error ( "An HTTP error occurred: " , e ) ;
2021-01-06 22:10:31 +00:00
} ) ;
2021-12-03 00:01:33 +00:00
httpServer . listen ( 3762 , ( ) => {
log ( "HTTP and WS listening on port 3762" ) ;
2021-01-18 20:11:28 +00:00
} ) ;
2021-01-06 22:10:31 +00:00
2021-11-27 05:05:09 +00:00
const runJob = ( job , ws ) => {
2021-04-16 18:21:27 +00:00
return new Promise ( ( resolve , reject ) => {
2021-11-27 05:05:09 +00:00
log ( ` Job ${ job . id } starting... ` , job . num ) ;
2021-04-16 18:21:27 +00:00
const object = JSON . parse ( job . msg ) ;
// If the image has a path, it must also have a type
2021-12-10 04:31:52 +00:00
if ( object . path && ! object . params . type ) {
2021-04-16 18:21:27 +00:00
reject ( new TypeError ( "Unknown image type" ) ) ;
}
2021-01-06 22:10:31 +00:00
2021-08-19 14:19:14 +00:00
const worker = new Worker ( join ( dirname ( fileURLToPath ( import . meta . url ) ) , "../utils/image-runner.js" ) , {
2021-04-29 23:55:58 +00:00
workerData : object
} ) ;
2021-05-11 03:59:19 +00:00
const timeout = setTimeout ( ( ) => {
worker . terminate ( ) ;
reject ( new Error ( "Job timed out" ) ) ;
} , 900000 ) ;
2021-11-27 05:05:09 +00:00
log ( ` Job ${ job . id } started ` , job . num ) ;
2021-05-06 21:40:05 +00:00
worker . once ( "message" , ( data ) => {
2021-05-11 03:59:19 +00:00
clearTimeout ( timeout ) ;
2021-11-27 05:05:09 +00:00
log ( ` Sending result of job ${ job . id } back to the bot ` , job . num ) ;
const jobObject = jobs . get ( job . id ) ;
2021-04-29 23:55:58 +00:00
jobObject . data = data . buffer ;
jobObject . ext = data . fileExtension ;
2021-11-27 05:05:09 +00:00
jobs . set ( job . id , jobObject ) ;
const waitResponse = Buffer . concat ( [ Buffer . from ( [ Rwait ] ) , jobObject . tag ] ) ;
ws . send ( waitResponse ) ;
2021-11-27 22:45:33 +00:00
resolve ( ) ;
2021-04-29 23:55:58 +00:00
} ) ;
2021-05-11 03:59:19 +00:00
worker . once ( "error" , ( e ) => {
clearTimeout ( timeout ) ;
reject ( e ) ;
} ) ;
2020-11-05 21:40:18 +00:00
} ) ;
2021-01-06 22:10:31 +00:00
} ;