This commit is contained in:
		
							parent
							
								
									3d3309282e
								
							
						
					
					
						commit
						a8c6d0ed90
					
				
					 12 changed files with 91 additions and 29 deletions
				
			
		| 
						 | 
				
			
			@ -13,7 +13,7 @@ import prominence = require('prominence');
 | 
			
		|||
import DriveFile, { getGridFSBucket } from '../models/drive-file';
 | 
			
		||||
import DriveFolder from '../models/drive-folder';
 | 
			
		||||
import serialize from '../serializers/drive-file';
 | 
			
		||||
import event from '../event';
 | 
			
		||||
import event, { publishDriveStream } from '../event';
 | 
			
		||||
import config from '../../conf';
 | 
			
		||||
 | 
			
		||||
const log = debug('misskey:register-drive-file');
 | 
			
		||||
| 
						 | 
				
			
			@ -243,6 +243,7 @@ export default (user: any, file: string | stream.Readable, ...args) => new Promi
 | 
			
		|||
		serialize(file).then(serializedFile => {
 | 
			
		||||
			// Publish drive_file_created event
 | 
			
		||||
			event(user._id, 'drive_file_created', serializedFile);
 | 
			
		||||
			publishDriveStream(user._id, 'file_created', serializedFile);
 | 
			
		||||
 | 
			
		||||
			// Register to search database
 | 
			
		||||
			if (config.elasticsearch.enable) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,7 +6,7 @@ import DriveFolder from '../../../models/drive-folder';
 | 
			
		|||
import DriveFile from '../../../models/drive-file';
 | 
			
		||||
import { validateFileName } from '../../../models/drive-file';
 | 
			
		||||
import serialize from '../../../serializers/drive-file';
 | 
			
		||||
import event from '../../../event';
 | 
			
		||||
import { publishDriveStream } from '../../../event';
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Update a file
 | 
			
		||||
| 
						 | 
				
			
			@ -72,6 +72,6 @@ module.exports = (params, user) => new Promise(async (res, rej) => {
 | 
			
		|||
	// Response
 | 
			
		||||
	res(fileObj);
 | 
			
		||||
 | 
			
		||||
	// Publish drive_file_updated event
 | 
			
		||||
	event(user._id, 'drive_file_updated', fileObj);
 | 
			
		||||
	// Publish file_updated event
 | 
			
		||||
	publishDriveStream(user._id, 'file_updated', fileObj);
 | 
			
		||||
});
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -5,7 +5,7 @@ import $ from 'cafy';
 | 
			
		|||
import DriveFolder from '../../../models/drive-folder';
 | 
			
		||||
import { isValidFolderName } from '../../../models/drive-folder';
 | 
			
		||||
import serialize from '../../../serializers/drive-folder';
 | 
			
		||||
import event from '../../../event';
 | 
			
		||||
import { publishDriveStream } from '../../../event';
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Create drive folder
 | 
			
		||||
| 
						 | 
				
			
			@ -52,6 +52,6 @@ module.exports = (params, user) => new Promise(async (res, rej) => {
 | 
			
		|||
	// Response
 | 
			
		||||
	res(folderObj);
 | 
			
		||||
 | 
			
		||||
	// Publish drive_folder_created event
 | 
			
		||||
	event(user._id, 'drive_folder_created', folderObj);
 | 
			
		||||
	// Publish folder_created event
 | 
			
		||||
	publishDriveStream(user._id, 'folder_created', folderObj);
 | 
			
		||||
});
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -5,7 +5,7 @@ import $ from 'cafy';
 | 
			
		|||
import DriveFolder from '../../../models/drive-folder';
 | 
			
		||||
import { isValidFolderName } from '../../../models/drive-folder';
 | 
			
		||||
import serialize from '../../../serializers/drive-folder';
 | 
			
		||||
import event from '../../../event';
 | 
			
		||||
import { publishDriveStream } from '../../../event';
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Update a folder
 | 
			
		||||
| 
						 | 
				
			
			@ -96,6 +96,6 @@ module.exports = (params, user) => new Promise(async (res, rej) => {
 | 
			
		|||
	// Response
 | 
			
		||||
	res(folderObj);
 | 
			
		||||
 | 
			
		||||
	// Publish drive_folder_updated event
 | 
			
		||||
	event(user._id, 'drive_folder_updated', folderObj);
 | 
			
		||||
	// Publish folder_updated event
 | 
			
		||||
	publishDriveStream(user._id, 'folder_updated', folderObj);
 | 
			
		||||
});
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -17,6 +17,10 @@ class MisskeyEvent {
 | 
			
		|||
		this.publish(`user-stream:${userId}`, type, typeof value === 'undefined' ? null : value);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	public publishDriveStream(userId: ID, type: string, value?: any): void {
 | 
			
		||||
		this.publish(`drive-stream:${userId}`, type, typeof value === 'undefined' ? null : value);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	public publishPostStream(postId: ID, type: string, value?: any): void {
 | 
			
		||||
		this.publish(`post-stream:${postId}`, type, typeof value === 'undefined' ? null : value);
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -46,6 +50,8 @@ const ev = new MisskeyEvent();
 | 
			
		|||
 | 
			
		||||
export default ev.publishUserStream.bind(ev);
 | 
			
		||||
 | 
			
		||||
export const publishDriveStream = ev.publishDriveStream.bind(ev);
 | 
			
		||||
 | 
			
		||||
export const publishPostStream = ev.publishPostStream.bind(ev);
 | 
			
		||||
 | 
			
		||||
export const publishMessagingStream = ev.publishMessagingStream.bind(ev);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										10
									
								
								src/api/stream/drive.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								src/api/stream/drive.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,10 @@
 | 
			
		|||
import * as websocket from 'websocket';
 | 
			
		||||
import * as redis from 'redis';
 | 
			
		||||
 | 
			
		||||
export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void {
 | 
			
		||||
	// Subscribe drive stream
 | 
			
		||||
	subscriber.subscribe(`misskey:drive-stream:${user._id}`);
 | 
			
		||||
	subscriber.on('message', (_, data) => {
 | 
			
		||||
		connection.send(data);
 | 
			
		||||
	});
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -7,6 +7,7 @@ import AccessToken from './models/access-token';
 | 
			
		|||
import isNativeToken from './common/is-native-token';
 | 
			
		||||
 | 
			
		||||
import homeStream from './stream/home';
 | 
			
		||||
import driveStream from './stream/drive';
 | 
			
		||||
import messagingStream from './stream/messaging';
 | 
			
		||||
import messagingIndexStream from './stream/messaging-index';
 | 
			
		||||
import serverStream from './stream/server';
 | 
			
		||||
| 
						 | 
				
			
			@ -58,6 +59,7 @@ module.exports = (server: http.Server) => {
 | 
			
		|||
 | 
			
		||||
		const channel =
 | 
			
		||||
			request.resourceURL.pathname === '/' ? homeStream :
 | 
			
		||||
			request.resourceURL.pathname === '/drive' ? driveStream :
 | 
			
		||||
			request.resourceURL.pathname === '/messaging' ? messagingStream :
 | 
			
		||||
			request.resourceURL.pathname === '/messaging-index' ? messagingIndexStream :
 | 
			
		||||
			null;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4,6 +4,7 @@ import MiOS from './mios';
 | 
			
		|||
import ServerStreamManager from './scripts/server-stream-manager';
 | 
			
		||||
import RequestsStreamManager from './scripts/requests-stream-manager';
 | 
			
		||||
import MessagingIndexStream from './scripts/messaging-index-stream-manager';
 | 
			
		||||
import DriveStreamManager from './scripts/drive-stream-manager';
 | 
			
		||||
 | 
			
		||||
export default (mios: MiOS) => {
 | 
			
		||||
	(riot as any).mixin('os', {
 | 
			
		||||
| 
						 | 
				
			
			@ -30,7 +31,7 @@ export default (mios: MiOS) => {
 | 
			
		|||
	(riot as any).mixin('api', {
 | 
			
		||||
		api: mios.api
 | 
			
		||||
	});
 | 
			
		||||
 | 
			
		||||
	(riot as any).mixin('drive-stream', { driveStream: new DriveStreamManager(mios.i) });
 | 
			
		||||
	(riot as any).mixin('stream', { stream: mios.stream });
 | 
			
		||||
 | 
			
		||||
	(riot as any).mixin('server-stream', { serverStream: new ServerStreamManager() });
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										20
									
								
								src/web/app/common/scripts/drive-stream-manager.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								src/web/app/common/scripts/drive-stream-manager.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,20 @@
 | 
			
		|||
import StreamManager from './stream-manager';
 | 
			
		||||
import Connection from './drive-stream';
 | 
			
		||||
 | 
			
		||||
export default class DriveStreamManager extends StreamManager<Connection> {
 | 
			
		||||
	private me;
 | 
			
		||||
 | 
			
		||||
	constructor(me) {
 | 
			
		||||
		super();
 | 
			
		||||
 | 
			
		||||
		this.me = me;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	public getConnection() {
 | 
			
		||||
		if (this.connection == null) {
 | 
			
		||||
			this.connection = new Connection(this.me);
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return this.connection;
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										14
									
								
								src/web/app/common/scripts/drive-stream.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										14
									
								
								src/web/app/common/scripts/drive-stream.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,14 @@
 | 
			
		|||
import Stream from './stream';
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Drive stream connection
 | 
			
		||||
 */
 | 
			
		||||
class Connection extends Stream {
 | 
			
		||||
	constructor(me) {
 | 
			
		||||
		super('drive', {
 | 
			
		||||
			i: me.token
 | 
			
		||||
		});
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export default Connection;
 | 
			
		||||
| 
						 | 
				
			
			@ -247,7 +247,10 @@
 | 
			
		|||
 | 
			
		||||
		this.mixin('i');
 | 
			
		||||
		this.mixin('api');
 | 
			
		||||
		this.mixin('stream');
 | 
			
		||||
 | 
			
		||||
		this.mixin('drive-stream');
 | 
			
		||||
		this.connection = this.driveStream.getConnection();
 | 
			
		||||
		this.connectionId = this.driveStream.use();
 | 
			
		||||
 | 
			
		||||
		this.files = [];
 | 
			
		||||
		this.folders = [];
 | 
			
		||||
| 
						 | 
				
			
			@ -280,10 +283,10 @@
 | 
			
		|||
				});
 | 
			
		||||
			});
 | 
			
		||||
 | 
			
		||||
			this.stream.on('drive_file_created', this.onStreamDriveFileCreated);
 | 
			
		||||
			this.stream.on('drive_file_updated', this.onStreamDriveFileUpdated);
 | 
			
		||||
			this.stream.on('drive_folder_created', this.onStreamDriveFolderCreated);
 | 
			
		||||
			this.stream.on('drive_folder_updated', this.onStreamDriveFolderUpdated);
 | 
			
		||||
			this.connection.on('file_created', this.onStreamDriveFileCreated);
 | 
			
		||||
			this.connection.on('file_updated', this.onStreamDriveFileUpdated);
 | 
			
		||||
			this.connection.on('folder_created', this.onStreamDriveFolderCreated);
 | 
			
		||||
			this.connection.on('folder_updated', this.onStreamDriveFolderUpdated);
 | 
			
		||||
 | 
			
		||||
			if (this.opts.folder) {
 | 
			
		||||
				this.move(this.opts.folder);
 | 
			
		||||
| 
						 | 
				
			
			@ -293,10 +296,11 @@
 | 
			
		|||
		});
 | 
			
		||||
 | 
			
		||||
		this.on('unmount', () => {
 | 
			
		||||
			this.stream.off('drive_file_created', this.onStreamDriveFileCreated);
 | 
			
		||||
			this.stream.off('drive_file_updated', this.onStreamDriveFileUpdated);
 | 
			
		||||
			this.stream.off('drive_folder_created', this.onStreamDriveFolderCreated);
 | 
			
		||||
			this.stream.off('drive_folder_updated', this.onStreamDriveFolderUpdated);
 | 
			
		||||
			this.connection.off('file_created', this.onStreamDriveFileCreated);
 | 
			
		||||
			this.connection.off('file_updated', this.onStreamDriveFileUpdated);
 | 
			
		||||
			this.connection.off('folder_created', this.onStreamDriveFolderCreated);
 | 
			
		||||
			this.connection.off('folder_updated', this.onStreamDriveFolderUpdated);
 | 
			
		||||
			this.driveStream.dispose(this.connectionId);
 | 
			
		||||
		});
 | 
			
		||||
 | 
			
		||||
		this.onStreamDriveFileCreated = file => {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -172,7 +172,10 @@
 | 
			
		|||
	<script>
 | 
			
		||||
		this.mixin('i');
 | 
			
		||||
		this.mixin('api');
 | 
			
		||||
		this.mixin('stream');
 | 
			
		||||
 | 
			
		||||
		this.mixin('drive-stream');
 | 
			
		||||
		this.connection = this.driveStream.getConnection();
 | 
			
		||||
		this.connectionId = this.driveStream.use();
 | 
			
		||||
 | 
			
		||||
		this.files = [];
 | 
			
		||||
		this.folders = [];
 | 
			
		||||
| 
						 | 
				
			
			@ -189,10 +192,10 @@
 | 
			
		|||
		this.multiple = this.opts.multiple;
 | 
			
		||||
 | 
			
		||||
		this.on('mount', () => {
 | 
			
		||||
			this.stream.on('drive_file_created', this.onStreamDriveFileCreated);
 | 
			
		||||
			this.stream.on('drive_file_updated', this.onStreamDriveFileUpdated);
 | 
			
		||||
			this.stream.on('drive_folder_created', this.onStreamDriveFolderCreated);
 | 
			
		||||
			this.stream.on('drive_folder_updated', this.onStreamDriveFolderUpdated);
 | 
			
		||||
			this.connection.on('file_created', this.onStreamDriveFileCreated);
 | 
			
		||||
			this.connection.on('file_updated', this.onStreamDriveFileUpdated);
 | 
			
		||||
			this.connection.on('folder_created', this.onStreamDriveFolderCreated);
 | 
			
		||||
			this.connection.on('folder_updated', this.onStreamDriveFolderUpdated);
 | 
			
		||||
 | 
			
		||||
			if (this.opts.folder) {
 | 
			
		||||
				this.cd(this.opts.folder, true);
 | 
			
		||||
| 
						 | 
				
			
			@ -208,10 +211,11 @@
 | 
			
		|||
		});
 | 
			
		||||
 | 
			
		||||
		this.on('unmount', () => {
 | 
			
		||||
			this.stream.off('drive_file_created', this.onStreamDriveFileCreated);
 | 
			
		||||
			this.stream.off('drive_file_updated', this.onStreamDriveFileUpdated);
 | 
			
		||||
			this.stream.off('drive_folder_created', this.onStreamDriveFolderCreated);
 | 
			
		||||
			this.stream.off('drive_folder_updated', this.onStreamDriveFolderUpdated);
 | 
			
		||||
			this.connection.off('file_created', this.onStreamDriveFileCreated);
 | 
			
		||||
			this.connection.off('file_updated', this.onStreamDriveFileUpdated);
 | 
			
		||||
			this.connection.off('folder_created', this.onStreamDriveFolderCreated);
 | 
			
		||||
			this.connection.off('folder_updated', this.onStreamDriveFolderUpdated);
 | 
			
		||||
			this.driveStream.dispose(this.connectionId);
 | 
			
		||||
		});
 | 
			
		||||
 | 
			
		||||
		this.onStreamDriveFileCreated = file => {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue