Redisがインストールされているときはイベントの共有にRedisのpub/subを使うように
This commit is contained in:
		
							parent
							
								
									f06d586680
								
							
						
					
					
						commit
						05f9ad11bb
					
				
					 3 changed files with 46 additions and 7 deletions
				
			
		|  | @ -1,6 +1,5 @@ | |||
| import autobind from 'autobind-decorator'; | ||||
| import * as websocket from 'websocket'; | ||||
| import Xev from 'xev'; | ||||
| import * as debug from 'debug'; | ||||
| 
 | ||||
| import User, { IUser } from '../../../models/user'; | ||||
|  | @ -11,6 +10,7 @@ import readNote from '../../../services/note/read'; | |||
| 
 | ||||
| import Channel from './channel'; | ||||
| import channels from './channels'; | ||||
| import { EventEmitter } from 'events'; | ||||
| 
 | ||||
| const log = debug('misskey'); | ||||
| 
 | ||||
|  | @ -21,14 +21,14 @@ export default class Connection { | |||
| 	public user?: IUser; | ||||
| 	public app: IApp; | ||||
| 	private wsConnection: websocket.connection; | ||||
| 	public subscriber: Xev; | ||||
| 	public subscriber: EventEmitter; | ||||
| 	private channels: Channel[] = []; | ||||
| 	private subscribingNotes: any = {}; | ||||
| 	public sendMessageToWsOverride: any = null; // 後方互換性のため
 | ||||
| 
 | ||||
| 	constructor( | ||||
| 		wsConnection: websocket.connection, | ||||
| 		subscriber: Xev, | ||||
| 		subscriber: EventEmitter, | ||||
| 		user: IUser, | ||||
| 		app: IApp | ||||
| 	) { | ||||
|  |  | |||
|  | @ -1,11 +1,14 @@ | |||
| import * as http from 'http'; | ||||
| import * as websocket from 'websocket'; | ||||
| import * as redis from 'redis'; | ||||
| import Xev from 'xev'; | ||||
| 
 | ||||
| import MainStreamConnection from './stream'; | ||||
| import { ParsedUrlQuery } from 'querystring'; | ||||
| import authenticate from './authenticate'; | ||||
| import channels from './stream/channels'; | ||||
| import { EventEmitter } from 'events'; | ||||
| import config from '../../config'; | ||||
| 
 | ||||
| module.exports = (server: http.Server) => { | ||||
| 	// Init websocket server
 | ||||
|  | @ -16,11 +19,36 @@ module.exports = (server: http.Server) => { | |||
| 	ws.on('request', async (request) => { | ||||
| 		const connection = request.accept(); | ||||
| 
 | ||||
| 		const ev = new Xev(); | ||||
| 
 | ||||
| 		const q = request.resourceURL.query as ParsedUrlQuery; | ||||
| 		const [user, app] = await authenticate(q.i as string); | ||||
| 
 | ||||
| 		let ev: EventEmitter; | ||||
| 
 | ||||
| 		if (config.redis) { | ||||
| 			// Connect to Redis
 | ||||
| 			const subscriber = redis.createClient( | ||||
| 				config.redis.port, config.redis.host); | ||||
| 
 | ||||
| 			subscriber.subscribe('misskey'); | ||||
| 
 | ||||
| 			ev = new EventEmitter(); | ||||
| 
 | ||||
| 			subscriber.on('message', async (_, data) => { | ||||
| 				const obj = JSON.parse(data); | ||||
| 
 | ||||
| 				console.log(obj); | ||||
| 
 | ||||
| 				ev.emit(obj.channel, obj.message); | ||||
| 			}); | ||||
| 
 | ||||
| 			connection.once('close', () => { | ||||
| 				subscriber.unsubscribe(); | ||||
| 				subscriber.quit(); | ||||
| 			}); | ||||
| 		} else { | ||||
| 			ev = new Xev(); | ||||
| 		} | ||||
| 
 | ||||
| 		const main = new MainStreamConnection(connection, ev, user, app); | ||||
| 
 | ||||
| 		// 後方互換性のため
 | ||||
|  |  | |||
|  | @ -1,4 +1,5 @@ | |||
| import * as mongo from 'mongodb'; | ||||
| import redis from './db/redis'; | ||||
| import Xev from 'xev'; | ||||
| import Meta, { IMeta } from './models/meta'; | ||||
| 
 | ||||
|  | @ -9,7 +10,10 @@ class Publisher { | |||
| 	private meta: IMeta; | ||||
| 
 | ||||
| 	constructor() { | ||||
| 		this.ev = new Xev(); | ||||
| 		// Redisがインストールされてないときはプロセス間通信を使う
 | ||||
| 		if (redis == null) { | ||||
| 			this.ev = new Xev(); | ||||
| 		} | ||||
| 
 | ||||
| 		setInterval(async () => { | ||||
| 			this.meta = await Meta.findOne({}); | ||||
|  | @ -28,7 +32,14 @@ class Publisher { | |||
| 			{ type: type, body: null } : | ||||
| 			{ type: type, body: value }; | ||||
| 
 | ||||
| 		this.ev.emit(channel, message); | ||||
| 		if (this.ev) { | ||||
| 			this.ev.emit(channel, message); | ||||
| 		} else { | ||||
| 			redis.publish('misskey', JSON.stringify({ | ||||
| 				channel: channel, | ||||
| 				message: message | ||||
| 			})); | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	public publishMainStream = (userId: ID, type: string, value?: any): void => { | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue