This commit is contained in:
syuilo 2018-04-25 18:04:16 +09:00
parent 4c9b4cb80d
commit 3d9ac6387e
3 changed files with 69 additions and 47 deletions

View File

@ -25,6 +25,10 @@ class MisskeyEvent {
this.publish(`note-stream:${noteId}`, type, typeof value === 'undefined' ? null : value); this.publish(`note-stream:${noteId}`, type, typeof value === 'undefined' ? null : value);
} }
public publishUserListStream(listId: ID, type: string, value?: any): void {
this.publish(`user-list-stream:${listId}`, type, typeof value === 'undefined' ? null : value);
}
public publishMessagingStream(userId: ID, otherpartyId: ID, type: string, value?: any): void { public publishMessagingStream(userId: ID, otherpartyId: ID, type: string, value?: any): void {
this.publish(`messaging-stream:${userId}-${otherpartyId}`, type, typeof value === 'undefined' ? null : value); this.publish(`messaging-stream:${userId}-${otherpartyId}`, type, typeof value === 'undefined' ? null : value);
} }
@ -69,6 +73,7 @@ export default ev.publishUserStream.bind(ev);
export const publishLocalTimelineStream = ev.publishLocalTimelineStream.bind(ev); export const publishLocalTimelineStream = ev.publishLocalTimelineStream.bind(ev);
export const publishGlobalTimelineStream = ev.publishGlobalTimelineStream.bind(ev); export const publishGlobalTimelineStream = ev.publishGlobalTimelineStream.bind(ev);
export const publishDriveStream = ev.publishDriveStream.bind(ev); export const publishDriveStream = ev.publishDriveStream.bind(ev);
export const publishUserListStream = ev.publishUserListStream.bind(ev);
export const publishNoteStream = ev.publishNoteStream.bind(ev); export const publishNoteStream = ev.publishNoteStream.bind(ev);
export const publishMessagingStream = ev.publishMessagingStream.bind(ev); export const publishMessagingStream = ev.publishMessagingStream.bind(ev);
export const publishMessagingIndexStream = ev.publishMessagingIndexStream.bind(ev); export const publishMessagingIndexStream = ev.publishMessagingIndexStream.bind(ev);

View File

@ -1,6 +1,7 @@
import $ from 'cafy'; import ID from '../../../../../cafy-id'; import $ from 'cafy'; import ID from '../../../../../cafy-id';
import UserList from '../../../../../models/user-list'; import UserList from '../../../../../models/user-list';
import User from '../../../../../models/user'; import User, { pack as packUser } from '../../../../../models/user';
import { publishUserListStream } from '../../../../../publishers/stream';
/** /**
* Add a user to a user list * Add a user to a user list
@ -45,4 +46,6 @@ module.exports = async (params, me) => new Promise(async (res, rej) => {
}); });
res(); res();
publishUserListStream(userList._id, 'userAdded', await packUser(user));
}); });

View File

@ -1,6 +1,6 @@
import Note, { pack, INote } from '../../models/note'; import Note, { pack, INote } from '../../models/note';
import User, { isLocalUser, IUser, isRemoteUser } from '../../models/user'; import User, { isLocalUser, IUser, isRemoteUser } from '../../models/user';
import stream, { publishLocalTimelineStream, publishGlobalTimelineStream } from '../../publishers/stream'; import stream, { publishLocalTimelineStream, publishGlobalTimelineStream, publishUserListStream } from '../../publishers/stream';
import Following from '../../models/following'; import Following from '../../models/following';
import { deliver } from '../../queue'; import { deliver } from '../../queue';
import renderNote from '../../remote/activitypub/renderer/note'; import renderNote from '../../remote/activitypub/renderer/note';
@ -16,6 +16,7 @@ import pushSw from '../../publishers/push-sw';
import event from '../../publishers/stream'; import event from '../../publishers/stream';
import parse from '../../text/parse'; import parse from '../../text/parse';
import { IApp } from '../../models/app'; import { IApp } from '../../models/app';
import UserList from '../../models/user-list';
export default async (user: IUser, data: { export default async (user: IUser, data: {
createdAt?: Date; createdAt?: Date;
@ -110,6 +111,7 @@ export default async (user: IUser, data: {
// タイムラインへの投稿 // タイムラインへの投稿
if (note.channelId == null) { if (note.channelId == null) {
if (!silent) {
if (isLocalUser(user)) { if (isLocalUser(user)) {
// Publish event to myself's stream // Publish event to myself's stream
stream(note.userId, 'note', noteObj); stream(note.userId, 'note', noteObj);
@ -121,12 +123,44 @@ export default async (user: IUser, data: {
// Publish note to global timeline stream // Publish note to global timeline stream
publishGlobalTimelineStream(noteObj); publishGlobalTimelineStream(noteObj);
// Fetch all followers // フォロワーに配信
const followers = await Following.find({ Following.find({
followeeId: note.userId followeeId: note.userId
}).then(followers => {
followers.map(async following => {
const follower = following._follower;
if (isLocalUser(follower)) {
// ストーキングしていない場合
if (!following.stalk) {
// この投稿が返信ならスキップ
if (note.replyId && !note._reply.userId.equals(following.followerId) && !note._reply.userId.equals(note.userId)) return;
}
// Publish event to followers stream
stream(following.followerId, 'note', noteObj);
} else {
//#region AP配送
// フォロワーがリモートユーザーかつ投稿者がローカルユーザーなら投稿を配信
if (isLocalUser(user)) {
deliver(user, await render(), follower.inbox);
}
//#endergion
}
});
}); });
if (!silent) { // リストに配信
UserList.find({
userIds: note.userId
}).then(lists => {
lists.forEach(list => {
publishUserListStream(list._id, 'note', noteObj);
});
});
}
//#region AP配送
const render = async () => { const render = async () => {
const content = data.renote && data.text == null const content = data.renote && data.text == null
? renderAnnounce(data.renote.uri ? data.renote.uri : await renderNote(data.renote)) ? renderAnnounce(data.renote.uri ? data.renote.uri : await renderNote(data.renote))
@ -143,27 +177,7 @@ export default async (user: IUser, data: {
if (data.renote && isLocalUser(user) && isRemoteUser(data.renote._user)) { if (data.renote && isLocalUser(user) && isRemoteUser(data.renote._user)) {
deliver(user, await render(), data.renote._user.inbox); deliver(user, await render(), data.renote._user.inbox);
} }
//#endergion
Promise.all(followers.map(async following => {
const follower = following._follower;
if (isLocalUser(follower)) {
// ストーキングしていない場合
if (!following.stalk) {
// この投稿が返信ならスキップ
if (note.replyId && !note._reply.userId.equals(following.followerId) && !note._reply.userId.equals(note.userId)) return;
}
// Publish event to followers stream
stream(following.followerId, 'note', noteObj);
} else {
// フォロワーがリモートユーザーかつ投稿者がローカルユーザーなら投稿を配信
if (isLocalUser(user)) {
deliver(user, await render(), follower.inbox);
}
}
}));
}
} }
// チャンネルへの投稿 // チャンネルへの投稿