Implement Talk has read federation (#5636)
* Talk read * fix * 複数のRead ActivityはCollectionとして送るように * あ
This commit is contained in:
		
							parent
							
								
									648be3005f
								
							
						
					
					
						commit
						3e85aad80a
					
				
					 12 changed files with 108 additions and 10 deletions
				
			
		
							
								
								
									
										14
									
								
								migration/1576269851876-TalkFederationId.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										14
									
								
								migration/1576269851876-TalkFederationId.ts
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,14 @@ | ||||||
|  | import {MigrationInterface, QueryRunner} from "typeorm"; | ||||||
|  | 
 | ||||||
|  | export class TalkFederationId1576269851876 implements MigrationInterface { | ||||||
|  |     name = 'TalkFederationId1576269851876' | ||||||
|  | 
 | ||||||
|  |     public async up(queryRunner: QueryRunner): Promise<any> { | ||||||
|  |         await queryRunner.query(`ALTER TABLE "messaging_message" ADD "uri" character varying(512)`, undefined); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public async down(queryRunner: QueryRunner): Promise<any> { | ||||||
|  |         await queryRunner.query(`ALTER TABLE "messaging_message" DROP COLUMN "uri"`, undefined); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -64,6 +64,11 @@ export class MessagingMessage { | ||||||
| 	}) | 	}) | ||||||
| 	public isRead: boolean; | 	public isRead: boolean; | ||||||
| 
 | 
 | ||||||
|  | 	@Column('varchar', { | ||||||
|  | 		length: 512, nullable: true, | ||||||
|  | 	}) | ||||||
|  | 	public uri: string | null; | ||||||
|  | 
 | ||||||
| 	@Column({ | 	@Column({ | ||||||
| 		...id(), | 		...id(), | ||||||
| 		array: true, default: '{}' | 		array: true, default: '{}' | ||||||
|  |  | ||||||
|  | @ -1,8 +1,9 @@ | ||||||
| import { IObject, isCreate, isDelete, isUpdate, isFollow, isAccept, isReject, isAdd, isRemove, isAnnounce, isLike, isUndo, isBlock, isCollectionOrOrderedCollection, isCollection  } from '../type'; | import { IObject, isCreate, isDelete, isUpdate, isRead, isFollow, isAccept, isReject, isAdd, isRemove, isAnnounce, isLike, isUndo, isBlock, isCollectionOrOrderedCollection, isCollection } from '../type'; | ||||||
| import { IRemoteUser } from '../../../models/entities/user'; | import { IRemoteUser } from '../../../models/entities/user'; | ||||||
| import create from './create'; | import create from './create'; | ||||||
| import performDeleteActivity from './delete'; | import performDeleteActivity from './delete'; | ||||||
| import performUpdateActivity from './update'; | import performUpdateActivity from './update'; | ||||||
|  | import { performReadActivity } from './read'; | ||||||
| import follow from './follow'; | import follow from './follow'; | ||||||
| import undo from './undo'; | import undo from './undo'; | ||||||
| import like from './like'; | import like from './like'; | ||||||
|  | @ -41,6 +42,8 @@ async function performOneActivity(actor: IRemoteUser, activity: IObject): Promis | ||||||
| 		await performDeleteActivity(actor, activity); | 		await performDeleteActivity(actor, activity); | ||||||
| 	} else if (isUpdate(activity)) { | 	} else if (isUpdate(activity)) { | ||||||
| 		await performUpdateActivity(actor, activity); | 		await performUpdateActivity(actor, activity); | ||||||
|  | 	} else if (isRead(activity)) { | ||||||
|  | 		await performReadActivity(actor, activity); | ||||||
| 	} else if (isFollow(activity)) { | 	} else if (isFollow(activity)) { | ||||||
| 		await follow(actor, activity); | 		await follow(actor, activity); | ||||||
| 	} else if (isAccept(activity)) { | 	} else if (isAccept(activity)) { | ||||||
|  |  | ||||||
							
								
								
									
										27
									
								
								src/remote/activitypub/kernel/read.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								src/remote/activitypub/kernel/read.ts
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,27 @@ | ||||||
|  | import { IRemoteUser } from '../../../models/entities/user'; | ||||||
|  | import { IRead, getApId } from '../type'; | ||||||
|  | import { isSelfHost, extractDbHost } from '../../../misc/convert-host'; | ||||||
|  | import { MessagingMessages } from '../../../models'; | ||||||
|  | import { readUserMessagingMessage } from '../../../server/api/common/read-messaging-message'; | ||||||
|  | 
 | ||||||
|  | export const performReadActivity = async (actor: IRemoteUser, activity: IRead): Promise<string> => { | ||||||
|  | 	const id = await getApId(activity.object); | ||||||
|  | 
 | ||||||
|  | 	if (!isSelfHost(extractDbHost(id))) { | ||||||
|  | 		return `skip: Read to foreign host (${id})`; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	const messageId = id.split('/').pop(); | ||||||
|  | 
 | ||||||
|  | 	const message = await MessagingMessages.findOne(messageId); | ||||||
|  | 	if (message == null) { | ||||||
|  | 		return `skip: message not found`; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if (actor.id != message.recipientId) { | ||||||
|  | 		return `skip: actor is not a message recipient`; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	await readUserMessagingMessage(message.recipientId!, message.userId, [message.id]); | ||||||
|  | 	return `ok: mark as read (${message.userId} => ${message.recipientId} ${message.id})`; | ||||||
|  | }; | ||||||
|  | @ -226,7 +226,7 @@ export async function createNote(value: string | IObject, resolver?: Resolver, s | ||||||
| 
 | 
 | ||||||
| 	if (note._misskey_talk && visibility === 'specified') { | 	if (note._misskey_talk && visibility === 'specified') { | ||||||
| 		for (const recipient of visibleUsers) { | 		for (const recipient of visibleUsers) { | ||||||
| 			await createMessage(actor, recipient, undefined, text || undefined, (files && files.length > 0) ? files[0] : null); | 			await createMessage(actor, recipient, undefined, text || undefined, (files && files.length > 0) ? files[0] : null, object.id); | ||||||
| 			return null; | 			return null; | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -6,7 +6,7 @@ | ||||||
|  * @param last URL of last page (optional) |  * @param last URL of last page (optional) | ||||||
|  * @param orderedItems attached objects (optional) |  * @param orderedItems attached objects (optional) | ||||||
|  */ |  */ | ||||||
| export default function(id: string, totalItems: any, first?: string, last?: string, orderedItems?: object) { | export default function(id: string | null, totalItems: any, first?: string, last?: string, orderedItems?: object) { | ||||||
| 	const page: any = { | 	const page: any = { | ||||||
| 		id, | 		id, | ||||||
| 		type: 'OrderedCollection', | 		type: 'OrderedCollection', | ||||||
|  |  | ||||||
							
								
								
									
										9
									
								
								src/remote/activitypub/renderer/read.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										9
									
								
								src/remote/activitypub/renderer/read.ts
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,9 @@ | ||||||
|  | import config from '../../../config'; | ||||||
|  | import { ILocalUser } from '../../../models/entities/user'; | ||||||
|  | import { MessagingMessage } from '../../../models/entities/messaging-message'; | ||||||
|  | 
 | ||||||
|  | export const renderReadActivity = (user: ILocalUser, message: MessagingMessage) => ({ | ||||||
|  | 	type: 'Read', | ||||||
|  | 	actor: `${config.url}/users/${user.id}`, | ||||||
|  | 	object: message.uri | ||||||
|  | }); | ||||||
|  | @ -140,6 +140,10 @@ export interface IUpdate extends IActivity { | ||||||
| 	type: 'Update'; | 	type: 'Update'; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | export interface IRead extends IActivity { | ||||||
|  | 	type: 'Read'; | ||||||
|  | } | ||||||
|  | 
 | ||||||
| export interface IUndo extends IActivity { | export interface IUndo extends IActivity { | ||||||
| 	type: 'Undo'; | 	type: 'Undo'; | ||||||
| } | } | ||||||
|  | @ -180,6 +184,7 @@ export interface IBlock extends IActivity { | ||||||
| export const isCreate = (object: IObject): object is ICreate => object.type === 'Create'; | export const isCreate = (object: IObject): object is ICreate => object.type === 'Create'; | ||||||
| export const isDelete = (object: IObject): object is IDelete => object.type === 'Delete'; | export const isDelete = (object: IObject): object is IDelete => object.type === 'Delete'; | ||||||
| export const isUpdate = (object: IObject): object is IUpdate => object.type === 'Update'; | export const isUpdate = (object: IObject): object is IUpdate => object.type === 'Update'; | ||||||
|  | export const isRead = (object: IObject): object is IRead => object.type === 'Read'; | ||||||
| export const isUndo = (object: IObject): object is IUndo => object.type === 'Undo'; | export const isUndo = (object: IObject): object is IUndo => object.type === 'Undo'; | ||||||
| export const isFollow = (object: IObject): object is IFollow => object.type === 'Follow'; | export const isFollow = (object: IObject): object is IFollow => object.type === 'Follow'; | ||||||
| export const isAccept = (object: IObject): object is IAccept => object.type === 'Accept'; | export const isAccept = (object: IObject): object is IAccept => object.type === 'Accept'; | ||||||
|  |  | ||||||
|  | @ -1,12 +1,17 @@ | ||||||
| import { publishMainStream, publishGroupMessagingStream } from '../../../services/stream'; | import { publishMainStream, publishGroupMessagingStream } from '../../../services/stream'; | ||||||
| import { publishMessagingStream } from '../../../services/stream'; | import { publishMessagingStream } from '../../../services/stream'; | ||||||
| import { publishMessagingIndexStream } from '../../../services/stream'; | import { publishMessagingIndexStream } from '../../../services/stream'; | ||||||
| import { User } from '../../../models/entities/user'; | import { User, ILocalUser, IRemoteUser } from '../../../models/entities/user'; | ||||||
| import { MessagingMessage } from '../../../models/entities/messaging-message'; | import { MessagingMessage } from '../../../models/entities/messaging-message'; | ||||||
| import { MessagingMessages, UserGroupJoinings, Users } from '../../../models'; | import { MessagingMessages, UserGroupJoinings, Users } from '../../../models'; | ||||||
| import { In } from 'typeorm'; | import { In } from 'typeorm'; | ||||||
| import { IdentifiableError } from '../../../misc/identifiable-error'; | import { IdentifiableError } from '../../../misc/identifiable-error'; | ||||||
| import { UserGroup } from '../../../models/entities/user-group'; | import { UserGroup } from '../../../models/entities/user-group'; | ||||||
|  | import { toArray } from '../../../prelude/array'; | ||||||
|  | import { renderReadActivity } from '../../../remote/activitypub/renderer/read'; | ||||||
|  | import { renderActivity } from '../../../remote/activitypub/renderer'; | ||||||
|  | import { deliver } from '../../../queue'; | ||||||
|  | import orderedCollection from '../../../remote/activitypub/renderer/ordered-collection'; | ||||||
| 
 | 
 | ||||||
| /** | /** | ||||||
|  * Mark messages as read |  * Mark messages as read | ||||||
|  | @ -101,3 +106,17 @@ export async function readGroupMessagingMessage( | ||||||
| 		publishMainStream(userId, 'readAllMessagingMessages'); | 		publishMainStream(userId, 'readAllMessagingMessages'); | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | export async function deliverReadActivity(user: ILocalUser, recipient: IRemoteUser, messages: MessagingMessage | MessagingMessage[]) { | ||||||
|  | 	messages = toArray(messages).filter(x => x.uri); | ||||||
|  | 	const contents = messages.map(x => renderReadActivity(user, x)); | ||||||
|  | 
 | ||||||
|  | 	if (contents.length > 1) { | ||||||
|  | 		const collection = orderedCollection(null, contents.length, undefined, undefined, contents); | ||||||
|  | 		deliver(user, renderActivity(collection), recipient.inbox); | ||||||
|  | 	} else { | ||||||
|  | 		for (const content of contents) { | ||||||
|  | 			deliver(user, renderActivity(content), recipient.inbox); | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -3,10 +3,10 @@ import { ID } from '../../../../misc/cafy-id'; | ||||||
| import define from '../../define'; | import define from '../../define'; | ||||||
| import { ApiError } from '../../error'; | import { ApiError } from '../../error'; | ||||||
| import { getUser } from '../../common/getters'; | import { getUser } from '../../common/getters'; | ||||||
| import { MessagingMessages, UserGroups, UserGroupJoinings } from '../../../../models'; | import { MessagingMessages, UserGroups, UserGroupJoinings, Users } from '../../../../models'; | ||||||
| import { makePaginationQuery } from '../../common/make-pagination-query'; | import { makePaginationQuery } from '../../common/make-pagination-query'; | ||||||
| import { Brackets } from 'typeorm'; | import { Brackets } from 'typeorm'; | ||||||
| import { readUserMessagingMessage, readGroupMessagingMessage } from '../../common/read-messaging-message'; | import { readUserMessagingMessage, readGroupMessagingMessage, deliverReadActivity } from '../../common/read-messaging-message'; | ||||||
| 
 | 
 | ||||||
| export const meta = { | export const meta = { | ||||||
| 	desc: { | 	desc: { | ||||||
|  | @ -114,6 +114,11 @@ export default define(meta, async (ps, user) => { | ||||||
| 		// Mark all as read
 | 		// Mark all as read
 | ||||||
| 		if (ps.markAsRead) { | 		if (ps.markAsRead) { | ||||||
| 			readUserMessagingMessage(user.id, recipient.id, messages.filter(m => m.recipientId === user.id).map(x => x.id)); | 			readUserMessagingMessage(user.id, recipient.id, messages.filter(m => m.recipientId === user.id).map(x => x.id)); | ||||||
|  | 
 | ||||||
|  | 			// リモートユーザーとのメッセージだったら既読配信
 | ||||||
|  | 			if (Users.isLocalUser(user) && Users.isRemoteUser(recipient)) { | ||||||
|  | 				deliverReadActivity(user, recipient, messages); | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		return await Promise.all(messages.map(message => MessagingMessages.pack(message, user, { | 		return await Promise.all(messages.map(message => MessagingMessages.pack(message, user, { | ||||||
|  |  | ||||||
|  | @ -1,7 +1,8 @@ | ||||||
| import autobind from 'autobind-decorator'; | import autobind from 'autobind-decorator'; | ||||||
| import { readUserMessagingMessage, readGroupMessagingMessage } from '../../common/read-messaging-message'; | import { readUserMessagingMessage, readGroupMessagingMessage, deliverReadActivity } from '../../common/read-messaging-message'; | ||||||
| import Channel from '../channel'; | import Channel from '../channel'; | ||||||
| import { UserGroupJoinings } from '../../../../models'; | import { UserGroupJoinings, Users, MessagingMessages } from '../../../../models'; | ||||||
|  | import { User, ILocalUser, IRemoteUser } from '../../../../models/entities/user'; | ||||||
| 
 | 
 | ||||||
| export default class extends Channel { | export default class extends Channel { | ||||||
| 	public readonly chName = 'messaging'; | 	public readonly chName = 'messaging'; | ||||||
|  | @ -9,11 +10,13 @@ export default class extends Channel { | ||||||
| 	public static requireCredential = true; | 	public static requireCredential = true; | ||||||
| 
 | 
 | ||||||
| 	private otherpartyId: string | null; | 	private otherpartyId: string | null; | ||||||
|  | 	private otherparty?: User; | ||||||
| 	private groupId: string | null; | 	private groupId: string | null; | ||||||
| 
 | 
 | ||||||
| 	@autobind | 	@autobind | ||||||
| 	public async init(params: any) { | 	public async init(params: any) { | ||||||
| 		this.otherpartyId = params.otherparty as string; | 		this.otherpartyId = params.otherparty as string; | ||||||
|  | 		this.otherparty = await Users.findOne({ id: this.otherpartyId }); | ||||||
| 		this.groupId = params.group as string; | 		this.groupId = params.group as string; | ||||||
| 
 | 
 | ||||||
| 		// Check joining
 | 		// Check joining
 | ||||||
|  | @ -44,6 +47,13 @@ export default class extends Channel { | ||||||
| 			case 'read': | 			case 'read': | ||||||
| 				if (this.otherpartyId) { | 				if (this.otherpartyId) { | ||||||
| 					readUserMessagingMessage(this.user!.id, this.otherpartyId, [body.id]); | 					readUserMessagingMessage(this.user!.id, this.otherpartyId, [body.id]); | ||||||
|  | 
 | ||||||
|  | 					// リモートユーザーからのメッセージだったら既読配信
 | ||||||
|  | 					if (Users.isLocalUser(this.user!) && Users.isRemoteUser(this.otherparty!)) { | ||||||
|  | 						MessagingMessages.findOne(body.id).then(message => { | ||||||
|  | 							if (message) deliverReadActivity(this.user as ILocalUser, this.otherparty as IRemoteUser, message); | ||||||
|  | 						}); | ||||||
|  | 					} | ||||||
| 				} else if (this.groupId) { | 				} else if (this.groupId) { | ||||||
| 					readGroupMessagingMessage(this.user!.id, this.groupId, [body.id]); | 					readGroupMessagingMessage(this.user!.id, this.groupId, [body.id]); | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
|  | @ -13,7 +13,7 @@ import renderCreate from '../../remote/activitypub/renderer/create'; | ||||||
| import { renderActivity } from '../../remote/activitypub/renderer'; | import { renderActivity } from '../../remote/activitypub/renderer'; | ||||||
| import { deliver } from '../../queue'; | import { deliver } from '../../queue'; | ||||||
| 
 | 
 | ||||||
| export async function createMessage(user: User, recipientUser: User | undefined, recipientGroup: UserGroup | undefined, text: string | undefined, file: DriveFile | null) { | export async function createMessage(user: User, recipientUser: User | undefined, recipientGroup: UserGroup | undefined, text: string | undefined, file: DriveFile | null, uri?: string) { | ||||||
| 	const message = await MessagingMessages.save({ | 	const message = await MessagingMessages.save({ | ||||||
| 		id: genId(), | 		id: genId(), | ||||||
| 		createdAt: new Date(), | 		createdAt: new Date(), | ||||||
|  | @ -23,7 +23,8 @@ export async function createMessage(user: User, recipientUser: User | undefined, | ||||||
| 		text: text ? text.trim() : null, | 		text: text ? text.trim() : null, | ||||||
| 		userId: user.id, | 		userId: user.id, | ||||||
| 		isRead: false, | 		isRead: false, | ||||||
| 		reads: [] as any[] | 		reads: [] as any[], | ||||||
|  | 		uri | ||||||
| 	} as MessagingMessage); | 	} as MessagingMessage); | ||||||
| 
 | 
 | ||||||
| 	const messageObj = await MessagingMessages.pack(message); | 	const messageObj = await MessagingMessages.pack(message); | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue