diff --git a/src/db/redis.ts b/src/db/redis.ts index 41d030149..7570aa59b 100644 --- a/src/db/redis.ts +++ b/src/db/redis.ts @@ -1,7 +1,7 @@ import * as redis from 'redis'; import config from '../config'; -export default redis.createClient( +const client = redis.createClient( config.redis.port, config.redis.host, { @@ -10,3 +10,7 @@ export default redis.createClient( db: config.redis.db || 0 } ); + +client.subscribe(config.host); + +export default client; diff --git a/src/server/api/streaming.ts b/src/server/api/streaming.ts index 10f2a70ba..4ec0c5137 100644 --- a/src/server/api/streaming.ts +++ b/src/server/api/streaming.ts @@ -1,12 +1,11 @@ import * as http from 'http'; import * as websocket from 'websocket'; -import * as redis from 'redis'; import MainStreamConnection from './stream'; import { ParsedUrlQuery } from 'querystring'; import authenticate from './authenticate'; import { EventEmitter } from 'events'; -import config from '../../config'; +import redisClient from '../../db/redis'; module.exports = (server: http.Server) => { // Init websocket server @@ -24,37 +23,21 @@ module.exports = (server: http.Server) => { const connection = request.accept(); - let ev: EventEmitter; + const ev = new EventEmitter(); - // Connect to Redis - const subscriber = redis.createClient( - config.redis.port, - config.redis.host, - { - password: config.redis.pass - } - ); + async function onRedisMessage(_: string, data: string) { + const parsed = JSON.parse(data); + ev.emit(parsed.channel, parsed.message); + } - subscriber.subscribe(config.host); - - ev = new EventEmitter(); - - subscriber.on('message', async (_, data) => { - const obj = JSON.parse(data); - - ev.emit(obj.channel, obj.message); - }); - - connection.once('close', () => { - subscriber.unsubscribe(); - subscriber.quit(); - }); + redisClient.on('message', onRedisMessage); const main = new MainStreamConnection(connection, ev, user, app); connection.once('close', () => { ev.removeAllListeners(); main.dispose(); + redisClient.off('message', onRedisMessage); }); connection.on('message', async (data) => {