Talk federation (#5534)
This commit is contained in:
		
							parent
							
								
									1e58b3daff
								
							
						
					
					
						commit
						245b08b624
					
				
					 6 changed files with 125 additions and 74 deletions
				
			
		|  | @ -138,7 +138,7 @@ export default Vue.extend({ | ||||||
| 			} | 			} | ||||||
| 			this.$root.api('users/search', { | 			this.$root.api('users/search', { | ||||||
| 				query: this.q, | 				query: this.q, | ||||||
| 				localOnly: true, | 				localOnly: false, | ||||||
| 				limit: 10, | 				limit: 10, | ||||||
| 				detail: false | 				detail: false | ||||||
| 			}).then(users => { | 			}).then(users => { | ||||||
|  |  | ||||||
|  | @ -23,6 +23,7 @@ import { genId } from '../../../misc/gen-id'; | ||||||
| import { fetchMeta } from '../../../misc/fetch-meta'; | import { fetchMeta } from '../../../misc/fetch-meta'; | ||||||
| import { ensure } from '../../../prelude/ensure'; | import { ensure } from '../../../prelude/ensure'; | ||||||
| import { getApLock } from '../../../misc/app-lock'; | import { getApLock } from '../../../misc/app-lock'; | ||||||
|  | import { createMessage } from '../../../services/messages/create'; | ||||||
| 
 | 
 | ||||||
| const logger = apLogger; | const logger = apLogger; | ||||||
| 
 | 
 | ||||||
|  | @ -223,6 +224,13 @@ export async function createNote(value: string | IObject, resolver?: Resolver, s | ||||||
| 		if (actor.uri) updatePerson(actor.uri); | 		if (actor.uri) updatePerson(actor.uri); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	if (note._misskey_talk && visibility === 'specified') { | ||||||
|  | 		for (const recipient of visibleUsers) { | ||||||
|  | 			await createMessage(actor, recipient, undefined, text || undefined, (files && files.length > 0) ? files[0] : null); | ||||||
|  | 			return null; | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return await post(actor, { | 	return await post(actor, { | ||||||
| 		createdAt: note.published ? new Date(note.published) : null, | 		createdAt: note.published ? new Date(note.published) : null, | ||||||
| 		files, | 		files, | ||||||
|  |  | ||||||
|  | @ -12,7 +12,7 @@ import { Emoji } from '../../../models/entities/emoji'; | ||||||
| import { Poll } from '../../../models/entities/poll'; | import { Poll } from '../../../models/entities/poll'; | ||||||
| import { ensure } from '../../../prelude/ensure'; | import { ensure } from '../../../prelude/ensure'; | ||||||
| 
 | 
 | ||||||
| export default async function renderNote(note: Note, dive = true): Promise<any> { | export default async function renderNote(note: Note, dive = true, isTalk = false): Promise<any> { | ||||||
| 	const promisedFiles: Promise<DriveFile[]> = note.fileIds.length > 0 | 	const promisedFiles: Promise<DriveFile[]> = note.fileIds.length > 0 | ||||||
| 		? DriveFiles.find({ id: In(note.fileIds) }) | 		? DriveFiles.find({ id: In(note.fileIds) }) | ||||||
| 		: Promise.resolve([]); | 		: Promise.resolve([]); | ||||||
|  | @ -145,6 +145,10 @@ export default async function renderNote(note: Note, dive = true): Promise<any> | ||||||
| 		})) | 		})) | ||||||
| 	} : {}; | 	} : {}; | ||||||
| 
 | 
 | ||||||
|  | 	const asTalk = isTalk ? { | ||||||
|  | 		_misskey_talk: true | ||||||
|  | 	} : {}; | ||||||
|  | 
 | ||||||
| 	return { | 	return { | ||||||
| 		id: `${config.url}/notes/${note.id}`, | 		id: `${config.url}/notes/${note.id}`, | ||||||
| 		type: 'Note', | 		type: 'Note', | ||||||
|  | @ -160,7 +164,8 @@ export default async function renderNote(note: Note, dive = true): Promise<any> | ||||||
| 		attachment: files.map(renderDocument), | 		attachment: files.map(renderDocument), | ||||||
| 		sensitive: note.cw != null || files.some(file => file.isSensitive), | 		sensitive: note.cw != null || files.some(file => file.isSensitive), | ||||||
| 		tag, | 		tag, | ||||||
| 		...asPoll | 		...asPoll, | ||||||
|  | 		...asTalk | ||||||
| 	}; | 	}; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -75,6 +75,7 @@ export interface INote extends IObject { | ||||||
| 	type: 'Note' | 'Question' | 'Article' | 'Audio' | 'Document' | 'Image' | 'Page' | 'Video'; | 	type: 'Note' | 'Question' | 'Article' | 'Audio' | 'Document' | 'Image' | 'Page' | 'Video'; | ||||||
| 	_misskey_content?: string; | 	_misskey_content?: string; | ||||||
| 	_misskey_quote?: string; | 	_misskey_quote?: string; | ||||||
|  | 	_misskey_talk: boolean; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| export interface IQuestion extends IObject { | export interface IQuestion extends IObject { | ||||||
|  |  | ||||||
|  | @ -1,17 +1,12 @@ | ||||||
| import $ from 'cafy'; | import $ from 'cafy'; | ||||||
| import { ID } from '../../../../../misc/cafy-id'; | import { ID } from '../../../../../misc/cafy-id'; | ||||||
| import { publishMainStream, publishGroupMessagingStream } from '../../../../../services/stream'; |  | ||||||
| import { publishMessagingStream, publishMessagingIndexStream } from '../../../../../services/stream'; |  | ||||||
| import pushSw from '../../../../../services/push-notification'; |  | ||||||
| import define from '../../../define'; | import define from '../../../define'; | ||||||
| import { ApiError } from '../../../error'; | import { ApiError } from '../../../error'; | ||||||
| import { getUser } from '../../../common/getters'; | import { getUser } from '../../../common/getters'; | ||||||
| import { MessagingMessages, DriveFiles, Mutings, UserGroups, UserGroupJoinings } from '../../../../../models'; | import { MessagingMessages, DriveFiles, UserGroups, UserGroupJoinings } from '../../../../../models'; | ||||||
| import { MessagingMessage } from '../../../../../models/entities/messaging-message'; |  | ||||||
| import { genId } from '../../../../../misc/gen-id'; |  | ||||||
| import { User } from '../../../../../models/entities/user'; | import { User } from '../../../../../models/entities/user'; | ||||||
| import { UserGroup } from '../../../../../models/entities/user-group'; | import { UserGroup } from '../../../../../models/entities/user-group'; | ||||||
| import { Not } from 'typeorm'; | import { createMessage } from '../../../../../services/messages/create'; | ||||||
| 
 | 
 | ||||||
| export const meta = { | export const meta = { | ||||||
| 	desc: { | 	desc: { | ||||||
|  | @ -147,68 +142,5 @@ export default define(meta, async (ps, user) => { | ||||||
| 		throw new ApiError(meta.errors.contentRequired); | 		throw new ApiError(meta.errors.contentRequired); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	const message = await MessagingMessages.save({ | 	return await createMessage(user, recipientUser, recipientGroup, ps.text, file); | ||||||
| 		id: genId(), |  | ||||||
| 		createdAt: new Date(), |  | ||||||
| 		fileId: file ? file.id : null, |  | ||||||
| 		recipientId: recipientUser ? recipientUser.id : null, |  | ||||||
| 		groupId: recipientGroup ? recipientGroup.id : null, |  | ||||||
| 		text: ps.text ? ps.text.trim() : null, |  | ||||||
| 		userId: user.id, |  | ||||||
| 		isRead: false, |  | ||||||
| 		reads: [] as any[] |  | ||||||
| 	} as MessagingMessage); |  | ||||||
| 
 |  | ||||||
| 	const messageObj = await MessagingMessages.pack(message); |  | ||||||
| 
 |  | ||||||
| 	if (recipientUser) { |  | ||||||
| 		// 自分のストリーム
 |  | ||||||
| 		publishMessagingStream(message.userId, recipientUser.id, 'message', messageObj); |  | ||||||
| 		publishMessagingIndexStream(message.userId, 'message', messageObj); |  | ||||||
| 		publishMainStream(message.userId, 'messagingMessage', messageObj); |  | ||||||
| 
 |  | ||||||
| 		// 相手のストリーム
 |  | ||||||
| 		publishMessagingStream(recipientUser.id, message.userId, 'message', messageObj); |  | ||||||
| 		publishMessagingIndexStream(recipientUser.id, 'message', messageObj); |  | ||||||
| 		publishMainStream(recipientUser.id, 'messagingMessage', messageObj); |  | ||||||
| 	} else if (recipientGroup) { |  | ||||||
| 		// グループのストリーム
 |  | ||||||
| 		publishGroupMessagingStream(recipientGroup.id, 'message', messageObj); |  | ||||||
| 
 |  | ||||||
| 		// メンバーのストリーム
 |  | ||||||
| 		const joinings = await UserGroupJoinings.find({ userGroupId: recipientGroup.id }); |  | ||||||
| 		for (const joining of joinings) { |  | ||||||
| 			publishMessagingIndexStream(joining.userId, 'message', messageObj); |  | ||||||
| 			publishMainStream(joining.userId, 'messagingMessage', messageObj); |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// 2秒経っても(今回作成した)メッセージが既読にならなかったら「未読のメッセージがありますよ」イベントを発行する
 |  | ||||||
| 	setTimeout(async () => { |  | ||||||
| 		const freshMessage = await MessagingMessages.findOne(message.id); |  | ||||||
| 		if (freshMessage == null) return; // メッセージが削除されている場合もある
 |  | ||||||
| 
 |  | ||||||
| 		if (recipientUser) { |  | ||||||
| 			if (freshMessage.isRead) return; // 既読
 |  | ||||||
| 
 |  | ||||||
| 			//#region ただしミュートされているなら発行しない
 |  | ||||||
| 			const mute = await Mutings.find({ |  | ||||||
| 				muterId: recipientUser.id, |  | ||||||
| 			}); |  | ||||||
| 			if (mute.map(m => m.muteeId).includes(user.id)) return; |  | ||||||
| 			//#endregion
 |  | ||||||
| 
 |  | ||||||
| 			publishMainStream(recipientUser.id, 'unreadMessagingMessage', messageObj); |  | ||||||
| 			pushSw(recipientUser.id, 'unreadMessagingMessage', messageObj); |  | ||||||
| 		} else if (recipientGroup) { |  | ||||||
| 			const joinings = await UserGroupJoinings.find({ userGroupId: recipientGroup.id, userId: Not(user.id) }); |  | ||||||
| 			for (const joining of joinings) { |  | ||||||
| 				if (freshMessage.reads.includes(joining.userId)) return; // 既読
 |  | ||||||
| 				publishMainStream(joining.userId, 'unreadMessagingMessage', messageObj); |  | ||||||
| 				pushSw(joining.userId, 'unreadMessagingMessage', messageObj); |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	}, 2000); |  | ||||||
| 
 |  | ||||||
| 	return messageObj; |  | ||||||
| }); | }); | ||||||
|  |  | ||||||
							
								
								
									
										105
									
								
								src/services/messages/create.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										105
									
								
								src/services/messages/create.ts
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,105 @@ | ||||||
|  | import { User } from '../../models/entities/user'; | ||||||
|  | import { UserGroup } from '../../models/entities/user-group'; | ||||||
|  | import { DriveFile } from '../../models/entities/drive-file'; | ||||||
|  | import { MessagingMessages, UserGroupJoinings, Mutings, Users } from '../../models'; | ||||||
|  | import { genId } from '../../misc/gen-id'; | ||||||
|  | import { MessagingMessage } from '../../models/entities/messaging-message'; | ||||||
|  | import { publishMessagingStream, publishMessagingIndexStream, publishMainStream, publishGroupMessagingStream } from '../stream'; | ||||||
|  | import pushNotification from '../push-notification'; | ||||||
|  | import { Not } from 'typeorm'; | ||||||
|  | import { Note } from '../../models/entities/note'; | ||||||
|  | import renderNote from '../../remote/activitypub/renderer/note'; | ||||||
|  | import renderCreate from '../../remote/activitypub/renderer/create'; | ||||||
|  | import { renderActivity } from '../../remote/activitypub/renderer'; | ||||||
|  | import { deliver } from '../../queue'; | ||||||
|  | 
 | ||||||
|  | export async function createMessage(user: User, recipientUser: User | undefined, recipientGroup: UserGroup | undefined, text: string | undefined, file: DriveFile | null) { | ||||||
|  | 	const message = await MessagingMessages.save({ | ||||||
|  | 		id: genId(), | ||||||
|  | 		createdAt: new Date(), | ||||||
|  | 		fileId: file ? file.id : null, | ||||||
|  | 		recipientId: recipientUser ? recipientUser.id : null, | ||||||
|  | 		groupId: recipientGroup ? recipientGroup.id : null, | ||||||
|  | 		text: text ? text.trim() : null, | ||||||
|  | 		userId: user.id, | ||||||
|  | 		isRead: false, | ||||||
|  | 		reads: [] as any[] | ||||||
|  | 	} as MessagingMessage); | ||||||
|  | 
 | ||||||
|  | 	const messageObj = await MessagingMessages.pack(message); | ||||||
|  | 
 | ||||||
|  | 	if (recipientUser) { | ||||||
|  | 		if (Users.isLocalUser(user)) { | ||||||
|  | 			// 自分のストリーム
 | ||||||
|  | 			publishMessagingStream(message.userId, recipientUser.id, 'message', messageObj); | ||||||
|  | 			publishMessagingIndexStream(message.userId, 'message', messageObj); | ||||||
|  | 			publishMainStream(message.userId, 'messagingMessage', messageObj); | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if (Users.isLocalUser(recipientUser)) { | ||||||
|  | 			// 相手のストリーム
 | ||||||
|  | 			publishMessagingStream(recipientUser.id, message.userId, 'message', messageObj); | ||||||
|  | 			publishMessagingIndexStream(recipientUser.id, 'message', messageObj); | ||||||
|  | 			publishMainStream(recipientUser.id, 'messagingMessage', messageObj); | ||||||
|  | 		} | ||||||
|  | 	} else if (recipientGroup) { | ||||||
|  | 		// グループのストリーム
 | ||||||
|  | 		publishGroupMessagingStream(recipientGroup.id, 'message', messageObj); | ||||||
|  | 
 | ||||||
|  | 		// メンバーのストリーム
 | ||||||
|  | 		const joinings = await UserGroupJoinings.find({ userGroupId: recipientGroup.id }); | ||||||
|  | 		for (const joining of joinings) { | ||||||
|  | 			publishMessagingIndexStream(joining.userId, 'message', messageObj); | ||||||
|  | 			publishMainStream(joining.userId, 'messagingMessage', messageObj); | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// 2秒経っても(今回作成した)メッセージが既読にならなかったら「未読のメッセージがありますよ」イベントを発行する
 | ||||||
|  | 	setTimeout(async () => { | ||||||
|  | 		const freshMessage = await MessagingMessages.findOne(message.id); | ||||||
|  | 		if (freshMessage == null) return; // メッセージが削除されている場合もある
 | ||||||
|  | 
 | ||||||
|  | 		if (recipientUser && Users.isLocalUser(recipientUser)) { | ||||||
|  | 			if (freshMessage.isRead) return; // 既読
 | ||||||
|  | 
 | ||||||
|  | 			//#region ただしミュートされているなら発行しない
 | ||||||
|  | 			const mute = await Mutings.find({ | ||||||
|  | 				muterId: recipientUser.id, | ||||||
|  | 			}); | ||||||
|  | 			if (mute.map(m => m.muteeId).includes(user.id)) return; | ||||||
|  | 			//#endregion
 | ||||||
|  | 
 | ||||||
|  | 			publishMainStream(recipientUser.id, 'unreadMessagingMessage', messageObj); | ||||||
|  | 			pushNotification(recipientUser.id, 'unreadMessagingMessage', messageObj); | ||||||
|  | 		} else if (recipientGroup) { | ||||||
|  | 			const joinings = await UserGroupJoinings.find({ userGroupId: recipientGroup.id, userId: Not(user.id) }); | ||||||
|  | 			for (const joining of joinings) { | ||||||
|  | 				if (freshMessage.reads.includes(joining.userId)) return; // 既読
 | ||||||
|  | 				publishMainStream(joining.userId, 'unreadMessagingMessage', messageObj); | ||||||
|  | 				pushNotification(joining.userId, 'unreadMessagingMessage', messageObj); | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}, 2000); | ||||||
|  | 
 | ||||||
|  | 	if (recipientUser && Users.isLocalUser(user) && Users.isRemoteUser(recipientUser)) { | ||||||
|  | 		const note = { | ||||||
|  | 			id: message.id, | ||||||
|  | 			createdAt: message.createdAt, | ||||||
|  | 			fileIds: message.fileId ? [ message.fileId ] : [], | ||||||
|  | 			text: message.text, | ||||||
|  | 			userId: message.userId, | ||||||
|  | 			visibility: 'specified', | ||||||
|  | 			mentions: [ recipientUser ].map(u => u.id), | ||||||
|  | 			mentionedRemoteUsers: JSON.stringify([ recipientUser ].map(u => ({ | ||||||
|  | 				uri: u.uri, | ||||||
|  | 				username: u.username, | ||||||
|  | 				host: u.host | ||||||
|  | 			}))), | ||||||
|  | 		} as Note; | ||||||
|  | 
 | ||||||
|  | 		const activity = renderActivity(renderCreate(await renderNote(note, false, true), note)); | ||||||
|  | 
 | ||||||
|  | 		deliver(user, activity, recipientUser.inbox); | ||||||
|  | 	} | ||||||
|  | 	return messageObj; | ||||||
|  | } | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue