From 3d9ac6387e4f698c15bdca0d8dbd1f0a2b02161f Mon Sep 17 00:00:00 2001 From: syuilo Date: Wed, 25 Apr 2018 18:04:16 +0900 Subject: [PATCH] wip --- src/publishers/stream.ts | 5 + src/server/api/endpoints/users/lists/push.ts | 5 +- src/services/note/create.ts | 106 +++++++++++-------- 3 files changed, 69 insertions(+), 47 deletions(-) diff --git a/src/publishers/stream.ts b/src/publishers/stream.ts index 2ecbfa0dd..dcc03e39f 100644 --- a/src/publishers/stream.ts +++ b/src/publishers/stream.ts @@ -25,6 +25,10 @@ class MisskeyEvent { this.publish(`note-stream:${noteId}`, type, typeof value === 'undefined' ? null : value); } + public publishUserListStream(listId: ID, type: string, value?: any): void { + this.publish(`user-list-stream:${listId}`, type, typeof value === 'undefined' ? null : value); + } + public publishMessagingStream(userId: ID, otherpartyId: ID, type: string, value?: any): void { this.publish(`messaging-stream:${userId}-${otherpartyId}`, type, typeof value === 'undefined' ? null : value); } @@ -69,6 +73,7 @@ export default ev.publishUserStream.bind(ev); export const publishLocalTimelineStream = ev.publishLocalTimelineStream.bind(ev); export const publishGlobalTimelineStream = ev.publishGlobalTimelineStream.bind(ev); export const publishDriveStream = ev.publishDriveStream.bind(ev); +export const publishUserListStream = ev.publishUserListStream.bind(ev); export const publishNoteStream = ev.publishNoteStream.bind(ev); export const publishMessagingStream = ev.publishMessagingStream.bind(ev); export const publishMessagingIndexStream = ev.publishMessagingIndexStream.bind(ev); diff --git a/src/server/api/endpoints/users/lists/push.ts b/src/server/api/endpoints/users/lists/push.ts index f21234775..467c08efd 100644 --- a/src/server/api/endpoints/users/lists/push.ts +++ b/src/server/api/endpoints/users/lists/push.ts @@ -1,6 +1,7 @@ import $ from 'cafy'; import ID from '../../../../../cafy-id'; import UserList from '../../../../../models/user-list'; -import User from '../../../../../models/user'; +import User, { pack as packUser } from '../../../../../models/user'; +import { publishUserListStream } from '../../../../../publishers/stream'; /** * Add a user to a user list @@ -45,4 +46,6 @@ module.exports = async (params, me) => new Promise(async (res, rej) => { }); res(); + + publishUserListStream(userList._id, 'userAdded', await packUser(user)); }); diff --git a/src/services/note/create.ts b/src/services/note/create.ts index e5ad96898..4808edfda 100644 --- a/src/services/note/create.ts +++ b/src/services/note/create.ts @@ -1,6 +1,6 @@ import Note, { pack, INote } from '../../models/note'; import User, { isLocalUser, IUser, isRemoteUser } from '../../models/user'; -import stream, { publishLocalTimelineStream, publishGlobalTimelineStream } from '../../publishers/stream'; +import stream, { publishLocalTimelineStream, publishGlobalTimelineStream, publishUserListStream } from '../../publishers/stream'; import Following from '../../models/following'; import { deliver } from '../../queue'; import renderNote from '../../remote/activitypub/renderer/note'; @@ -16,6 +16,7 @@ import pushSw from '../../publishers/push-sw'; import event from '../../publishers/stream'; import parse from '../../text/parse'; import { IApp } from '../../models/app'; +import UserList from '../../models/user-list'; export default async (user: IUser, data: { createdAt?: Date; @@ -110,60 +111,73 @@ export default async (user: IUser, data: { // タイムラインへの投稿 if (note.channelId == null) { - if (isLocalUser(user)) { - // Publish event to myself's stream - stream(note.userId, 'note', noteObj); - - // Publish note to local timeline stream - publishLocalTimelineStream(noteObj); - } - - // Publish note to global timeline stream - publishGlobalTimelineStream(noteObj); - - // Fetch all followers - const followers = await Following.find({ - followeeId: note.userId - }); - if (!silent) { - const render = async () => { - const content = data.renote && data.text == null - ? renderAnnounce(data.renote.uri ? data.renote.uri : await renderNote(data.renote)) - : renderCreate(await renderNote(note)); - return packAp(content); - }; + if (isLocalUser(user)) { + // Publish event to myself's stream + stream(note.userId, 'note', noteObj); - // 投稿がリプライかつ投稿者がローカルユーザーかつリプライ先の投稿の投稿者がリモートユーザーなら配送 - if (data.reply && isLocalUser(user) && isRemoteUser(data.reply._user)) { - deliver(user, await render(), data.reply._user.inbox); + // Publish note to local timeline stream + publishLocalTimelineStream(noteObj); } - // 投稿がRenoteかつ投稿者がローカルユーザーかつRenote元の投稿の投稿者がリモートユーザーなら配送 - if (data.renote && isLocalUser(user) && isRemoteUser(data.renote._user)) { - deliver(user, await render(), data.renote._user.inbox); - } + // Publish note to global timeline stream + publishGlobalTimelineStream(noteObj); - Promise.all(followers.map(async following => { - const follower = following._follower; + // フォロワーに配信 + Following.find({ + followeeId: note.userId + }).then(followers => { + followers.map(async following => { + const follower = following._follower; - if (isLocalUser(follower)) { - // ストーキングしていない場合 - if (!following.stalk) { - // この投稿が返信ならスキップ - if (note.replyId && !note._reply.userId.equals(following.followerId) && !note._reply.userId.equals(note.userId)) return; + if (isLocalUser(follower)) { + // ストーキングしていない場合 + if (!following.stalk) { + // この投稿が返信ならスキップ + if (note.replyId && !note._reply.userId.equals(following.followerId) && !note._reply.userId.equals(note.userId)) return; + } + + // Publish event to followers stream + stream(following.followerId, 'note', noteObj); + } else { + //#region AP配送 + // フォロワーがリモートユーザーかつ投稿者がローカルユーザーなら投稿を配信 + if (isLocalUser(user)) { + deliver(user, await render(), follower.inbox); + } + //#endergion } + }); + }); - // Publish event to followers stream - stream(following.followerId, 'note', noteObj); - } else { - // フォロワーがリモートユーザーかつ投稿者がローカルユーザーなら投稿を配信 - if (isLocalUser(user)) { - deliver(user, await render(), follower.inbox); - } - } - })); + // リストに配信 + UserList.find({ + userIds: note.userId + }).then(lists => { + lists.forEach(list => { + publishUserListStream(list._id, 'note', noteObj); + }); + }); } + + //#region AP配送 + const render = async () => { + const content = data.renote && data.text == null + ? renderAnnounce(data.renote.uri ? data.renote.uri : await renderNote(data.renote)) + : renderCreate(await renderNote(note)); + return packAp(content); + }; + + // 投稿がリプライかつ投稿者がローカルユーザーかつリプライ先の投稿の投稿者がリモートユーザーなら配送 + if (data.reply && isLocalUser(user) && isRemoteUser(data.reply._user)) { + deliver(user, await render(), data.reply._user.inbox); + } + + // 投稿がRenoteかつ投稿者がローカルユーザーかつRenote元の投稿の投稿者がリモートユーザーなら配送 + if (data.renote && isLocalUser(user) && isRemoteUser(data.renote._user)) { + deliver(user, await render(), data.renote._user.inbox); + } + //#endergion } // チャンネルへの投稿