2021-01-03 04:40:44 +00:00
// code originally provided by tzlil
2020-11-17 14:52:12 +00:00
2020-12-03 16:30:33 +00:00
require ( "dotenv" ) . config ( ) ;
2020-11-17 14:52:12 +00:00
const os = require ( "os" ) ;
const { Worker , isMainThread , parentPort } = require ( "worker_threads" ) ;
2020-08-31 22:15:34 +00:00
const magick = require ( "../utils/image.js" ) ;
2020-10-18 21:53:35 +00:00
const execPromise = require ( "util" ) . promisify ( require ( "child_process" ) . exec ) ;
2020-11-17 14:52:12 +00:00
const net = require ( "net" ) ;
const dgram = require ( "dgram" ) ; // for UDP servers
const socket = dgram . createSocket ( "udp4" ) ; // Our universal UDP socket, this might cause issues and we may have to use a seperate socket for each connection
2020-08-31 22:15:34 +00:00
2020-12-26 02:27:45 +00:00
const start = process . hrtime ( ) ;
2020-11-17 14:52:12 +00:00
const log = ( msg , thread ) => {
console . log ( ` [ ${ process . hrtime ( start ) [ 1 ] / 1000000 } ${ ( thread ) ? ` : ${ thread } ` : "" } ] \t ${ msg } ` ) ;
} ;
2020-11-05 21:40:18 +00:00
2020-11-17 14:52:12 +00:00
const jobs = { } ;
// Should look like UUID : { addr : "someaddr", port: someport msg: "request" }
const queue = [ ] ;
// Array of UUIDs
2020-08-31 22:15:34 +00:00
2020-11-17 14:52:12 +00:00
if ( isMainThread ) {
const { v4 : uuidv4 } = require ( "uuid" ) ;
2021-01-03 05:02:41 +00:00
let cpuLoad = 0 ;
2020-12-26 02:27:45 +00:00
const getAverage = ( ) => {
const cpus = os . cpus ( ) ;
let idle = 0 ;
let tick = 0 ;
for ( const cpu of cpus ) {
for ( const type in cpu . times ) {
tick += cpu . times [ type ] ;
}
idle += cpu . times . idle ;
}
return {
idle : idle / cpus . length ,
tick : tick / cpus . length
} ;
} ;
let measure = getAverage ( ) ;
setInterval ( ( ) => {
const newMeasure = getAverage ( ) ;
const idleDiff = newMeasure . idle - measure . idle ;
const tickDiff = newMeasure . tick - measure . tick ;
cpuLoad = 100 - ~ ~ ( 100 * idleDiff / tickDiff ) ;
measure = newMeasure ;
} , 5000 ) ;
2020-10-06 19:48:22 +00:00
2020-11-17 14:52:12 +00:00
const MAX _WORKERS = process . env . WORKERS === "" ? parseInt ( process . env . WORKERS ) : os . cpus ( ) . length * 4 ; // Completely arbitrary, should usually be some multiple of your amount of cores
let workingWorkers = 0 ;
2020-11-05 21:40:18 +00:00
2020-11-17 14:52:12 +00:00
const acceptJob = ( uuid ) => {
workingWorkers ++ ;
const worker = new Worker ( _ _filename ) ;
2021-01-03 06:05:12 +00:00
log ( ` Spawned worker ${ uuid } ` ) ;
2020-11-17 14:52:12 +00:00
queue . shift ( ) ;
worker . once ( "message" , ( uuid ) => {
// This means the worker is finished
workingWorkers -- ;
if ( queue . length > 0 ) {
acceptJob ( queue [ 0 ] ) ; // Get the next job UUID in queue and remove it from the original queue
delete jobs [ uuid ] ;
}
2020-11-05 21:40:18 +00:00
} ) ;
2020-12-31 00:46:17 +00:00
worker . once ( "error" , err => {
2021-01-03 05:56:27 +00:00
console . error ( ` Error on worker ${ uuid } : ` , err ) ;
2020-12-14 18:39:18 +00:00
socket . send ( Buffer . concat ( [ Buffer . from ( [ 0x2 ] ) , Buffer . from ( uuid ) , Buffer . from ( err . toString ( ) ) ] ) , jobs [ uuid ] . port , jobs [ uuid ] . addr ) ;
2020-12-14 16:58:31 +00:00
} ) ;
2020-12-31 00:46:17 +00:00
worker . once ( "exit" , ( code ) => {
2020-12-14 16:58:31 +00:00
workingWorkers -- ;
if ( queue . length > 0 ) {
acceptJob ( queue [ 0 ] ) ;
delete jobs [ uuid ] ;
}
2021-01-03 06:05:12 +00:00
if ( code === 0 ) {
log ( ` Worker ${ uuid } successfully exited ` ) ;
} else {
console . error ( ` Worker ${ uuid } stopped with exit code ${ code } ` ) ;
}
2020-11-05 21:40:18 +00:00
} ) ;
2020-11-17 14:52:12 +00:00
worker . postMessage ( {
uuid : uuid ,
msg : jobs [ uuid ] . msg ,
addr : jobs [ uuid ] . addr ,
port : jobs [ uuid ] . port ,
threadNum : workingWorkers
2020-11-05 21:40:18 +00:00
} ) ;
2020-11-17 14:52:12 +00:00
} ;
const server = dgram . createSocket ( "udp4" ) ; //Create a UDP server for listening to requests, we dont need tcp
server . on ( "message" , ( msg , rinfo ) => {
const opcode = msg . readUint8 ( 0 ) ;
const req = msg . toString ( ) . slice ( 1 , msg . length ) ;
// 0x0 == Cancel job
// 0x1 == Queue job
2020-12-26 02:27:45 +00:00
// 0x2 == Get CPU usage
2020-11-17 14:52:12 +00:00
if ( opcode == 0x0 ) {
queue . shift ( ) ;
delete jobs [ req ] ;
} else if ( opcode == 0x1 ) {
const job = { addr : rinfo . address , port : rinfo . port , msg : req } ;
const uuid = uuidv4 ( ) ;
jobs [ uuid ] = job ;
queue . push ( uuid ) ;
if ( workingWorkers < MAX _WORKERS ) {
2021-01-03 04:40:44 +00:00
log ( ` Got request for job ${ job . msg } with id ${ uuid } ` , job . threadNum ) ;
2020-11-17 14:52:12 +00:00
acceptJob ( uuid ) ;
2021-01-03 04:40:44 +00:00
} else {
log ( ` Got request for job ${ job . msg } with id ${ uuid } , queued in position ${ queue . indexOf ( uuid ) - 1 } ` , job . threadNum ) ;
2020-11-17 14:52:12 +00:00
}
const newBuffer = Buffer . concat ( [ Buffer . from ( [ 0x0 ] ) , Buffer . from ( uuid ) ] ) ;
socket . send ( newBuffer , rinfo . port , rinfo . address ) ;
2020-12-26 02:27:45 +00:00
} else if ( opcode == 0x2 ) {
socket . send ( Buffer . concat ( [ Buffer . from ( [ 0x3 ] ) , Buffer . from ( cpuLoad . toString ( ) ) ] ) , rinfo . port , rinfo . address ) ;
2020-11-17 14:52:12 +00:00
} else {
log ( "Could not parse message" ) ;
}
2020-11-05 21:40:18 +00:00
} ) ;
2020-11-17 14:52:12 +00:00
server . on ( "listening" , ( ) => {
const address = server . address ( ) ;
log ( ` server listening ${ address . address } : ${ address . port } ` ) ;
} ) ;
server . bind ( 8080 ) ; // ATTENTION: Always going to be bound to 0.0.0.0 !!!
} else {
parentPort . once ( "message" , async ( job ) => {
2021-01-03 04:40:44 +00:00
log ( ` ${ job . uuid } worker started ` , job . threadNum ) ;
2020-11-17 14:52:12 +00:00
2021-01-03 05:56:27 +00:00
const object = JSON . parse ( job . msg ) ;
let type ;
if ( object . path ) {
type = object . type ;
if ( ! object . type ) {
type = await magick . getType ( object . path ) ;
2020-11-17 14:52:12 +00:00
}
2021-01-03 05:56:27 +00:00
if ( ! type ) {
throw new TypeError ( "Unknown image type" ) ;
2020-11-17 14:52:12 +00:00
}
2021-01-03 05:56:27 +00:00
object . type = type . split ( "/" ) [ 1 ] ;
if ( object . type !== "gif" && object . onlyGIF ) throw new TypeError ( ` Expected a GIF, got ${ object . type } ` ) ;
object . delay = object . delay ? object . delay : 0 ;
}
if ( object . type === "gif" && ! object . delay ) {
const delay = ( await execPromise ( ` ffprobe -v 0 -of csv=p=0 -select_streams v:0 -show_entries stream=r_frame_rate ${ object . path } ` ) ) . stdout . replace ( "\n" , "" ) ;
object . delay = ( 100 / delay . split ( "/" ) [ 0 ] ) * delay . split ( "/" ) [ 1 ] ;
}
2020-11-17 14:52:12 +00:00
2021-01-03 05:56:27 +00:00
const data = await magick . run ( object , true ) ;
2020-11-17 14:52:12 +00:00
2021-01-03 05:56:27 +00:00
log ( ` ${ job . uuid } is done ` , job . threadNum ) ;
const server = net . createServer ( function ( tcpSocket ) {
tcpSocket . write ( Buffer . concat ( [ Buffer . from ( type ? type : "image/png" ) , Buffer . from ( "\n" ) , data ] ) , ( err ) => {
if ( err ) console . error ( err ) ;
tcpSocket . end ( ( ) => {
2021-01-03 06:05:12 +00:00
server . close ( ) ;
2020-12-26 18:17:10 +00:00
} ) ;
2020-11-17 14:52:12 +00:00
} ) ;
2021-01-03 05:56:27 +00:00
} ) ;
server . listen ( job . port , job . addr ) ;
// handle address in use errors
server . on ( "error" , ( e ) => {
if ( e . code === "EADDRINUSE" ) {
console . log ( "Address in use, retrying..." ) ;
setTimeout ( ( ) => {
server . close ( ) ;
server . listen ( job . port , job . addr ) ;
} , 500 ) ;
}
} ) ;
2021-01-03 06:05:12 +00:00
socket . send ( Buffer . concat ( [ Buffer . from ( [ 0x1 ] ) , Buffer . from ( job . uuid ) , Buffer . from ( job . port . toString ( ) ) ] ) , job . port , job . addr , ( ) => {
socket . close ( ) ;
} ) ;
2021-01-03 05:56:27 +00:00
parentPort . postMessage ( job . uuid ) ; //Inform main thread about this worker freeing up
2020-11-05 21:40:18 +00:00
} ) ;
2020-11-17 14:52:12 +00:00
}