From df38c2f485937d72c495d3195804830b09aa3e09 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Wed, 4 Apr 2018 20:29:26 +0900 Subject: [PATCH 01/15] Extract http request from post delivery job --- src/post/distribute.ts | 96 +++++++++++++++++++++++++-- src/processor/http/deliver-post.ts | 100 ++++------------------------- 2 files changed, 104 insertions(+), 92 deletions(-) diff --git a/src/post/distribute.ts b/src/post/distribute.ts index 49c6eb22d..ad699d6b8 100644 --- a/src/post/distribute.ts +++ b/src/post/distribute.ts @@ -1,8 +1,11 @@ +import Channel from '../models/channel'; +import ChannelWatching from '../models/channel-watching'; +import Following from '../models/following'; import Mute from '../models/mute'; import Post, { pack } from '../models/post'; import Watching from '../models/post-watching'; -import User from '../models/user'; -import stream from '../publishers/stream'; +import User, { isLocalUser } from '../models/user'; +import stream, { publishChannelStream } from '../publishers/stream'; import notify from '../publishers/notify'; import pushSw from '../publishers/push-sw'; import queue from '../queue'; @@ -21,10 +24,6 @@ export default async (user, mentions, post) => { latestPost: post._id } }), - new Promise((resolve, reject) => queue.create('http', { - type: 'deliverPost', - id: post._id, - }).save(error => error ? reject(error) : resolve())), ] as Array>; function addMention(promisedMentionee, reason) { @@ -50,6 +49,91 @@ export default async (user, mentions, post) => { })); } + // タイムラインへの投稿 + if (!post.channelId) { + promises.push( + // Publish event to myself's stream + promisedPostObj.then(postObj => { + stream(post.userId, 'post', postObj); + }), + + Promise.all([ + User.findOne({ _id: post.userId }), + + // Fetch all followers + Following.aggregate([{ + $lookup: { + from: 'users', + localField: 'followerId', + foreignField: '_id', + as: 'follower' + } + }, { + $match: { + followeeId: post.userId + } + }], { + _id: false + }) + ]).then(([user, followers]) => Promise.all(followers.map(following => { + if (isLocalUser(following.follower)) { + // Publish event to followers stream + return promisedPostObj.then(postObj => { + stream(following.followerId, 'post', postObj); + }); + } + + return new Promise((resolve, reject) => { + queue.create('http', { + type: 'deliverPost', + fromId: user._id, + toId: following.followerId, + postId: post._id + }).save(error => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); + }))) + ); + } + + // チャンネルへの投稿 + if (post.channelId) { + promises.push( + // Increment channel index(posts count) + Channel.update({ _id: post.channelId }, { + $inc: { + index: 1 + } + }), + + // Publish event to channel + promisedPostObj.then(postObj => { + publishChannelStream(post.channelId, 'post', postObj); + }), + + Promise.all([ + promisedPostObj, + + // Get channel watchers + ChannelWatching.find({ + channelId: post.channelId, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }) + ]).then(([postObj, watches]) => { + // チャンネルの視聴者(のタイムライン)に配信 + watches.forEach(w => { + stream(w.userId, 'post', postObj); + }); + }) + ); + } + // If has in reply to post if (post.replyId) { promises.push( diff --git a/src/processor/http/deliver-post.ts b/src/processor/http/deliver-post.ts index c00ab912c..48ad4f95a 100644 --- a/src/processor/http/deliver-post.ts +++ b/src/processor/http/deliver-post.ts @@ -1,93 +1,21 @@ -import Channel from '../../models/channel'; -import Following from '../../models/following'; -import ChannelWatching from '../../models/channel-watching'; -import Post, { pack } from '../../models/post'; -import User, { isLocalUser } from '../../models/user'; -import stream, { publishChannelStream } from '../../publishers/stream'; +import Post from '../../models/post'; +import User, { IRemoteUser } from '../../models/user'; import context from '../../remote/activitypub/renderer/context'; import renderCreate from '../../remote/activitypub/renderer/create'; import renderNote from '../../remote/activitypub/renderer/note'; import request from '../../remote/request'; -export default ({ data }) => Post.findOne({ _id: data.id }).then(post => { - const promisedPostObj = pack(post); - const promises = []; +export default async ({ data }) => { + const promisedTo = User.findOne({ _id: data.toId }) as Promise; + const [from, post] = await Promise.all([ + User.findOne({ _id: data.fromId }), + Post.findOne({ _id: data.postId }) + ]); + const note = await renderNote(from, post); + const to = await promisedTo; + const create = renderCreate(note); - // タイムラインへの投稿 - if (!post.channelId) { - promises.push( - // Publish event to myself's stream - promisedPostObj.then(postObj => { - stream(post.userId, 'post', postObj); - }), + create['@context'] = context; - Promise.all([ - User.findOne({ _id: post.userId }), - - // Fetch all followers - Following.aggregate([{ - $lookup: { - from: 'users', - localField: 'followerId', - foreignField: '_id', - as: 'follower' - } - }, { - $match: { - followeeId: post.userId - } - }], { - _id: false - }) - ]).then(([user, followers]) => Promise.all(followers.map(following => { - if (isLocalUser(following.follower)) { - // Publish event to followers stream - return promisedPostObj.then(postObj => { - stream(following.followerId, 'post', postObj); - }); - } - - return renderNote(user, post).then(note => { - const create = renderCreate(note); - create['@context'] = context; - return request(user, following.follower[0].account.inbox, create); - }); - }))) - ); - } - - // チャンネルへの投稿 - if (post.channelId) { - promises.push( - // Increment channel index(posts count) - Channel.update({ _id: post.channelId }, { - $inc: { - index: 1 - } - }), - - // Publish event to channel - promisedPostObj.then(postObj => { - publishChannelStream(post.channelId, 'post', postObj); - }), - - Promise.all([ - promisedPostObj, - - // Get channel watchers - ChannelWatching.find({ - channelId: post.channelId, - // 削除されたドキュメントは除く - deletedAt: { $exists: false } - }) - ]).then(([postObj, watches]) => { - // チャンネルの視聴者(のタイムライン)に配信 - watches.forEach(w => { - stream(w.userId, 'post', postObj); - }); - }) - ); - } - - return Promise.all(promises); -}); + return request(from, to.account.inbox, create); +}; From 1b6bae72c2aa19141133bbaf6939a4a5dded03b1 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Wed, 4 Apr 2018 21:56:04 +0900 Subject: [PATCH 02/15] Make HTTP request first in follow processor --- src/processor/http/follow.ts | 118 +++++++++++++++++------------------ 1 file changed, 59 insertions(+), 59 deletions(-) diff --git a/src/processor/http/follow.ts b/src/processor/http/follow.ts index 8bf890efb..ed36fa18d 100644 --- a/src/processor/http/follow.ts +++ b/src/processor/http/follow.ts @@ -1,4 +1,4 @@ -import User, { isLocalUser, pack as packUser } from '../../models/user'; +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../models/user'; import Following from '../../models/following'; import FollowingLog from '../../models/following-log'; import FollowedLog from '../../models/followed-log'; @@ -7,63 +7,63 @@ import notify from '../../publishers/notify'; import context from '../../remote/activitypub/renderer/context'; import render from '../../remote/activitypub/renderer/follow'; import request from '../../remote/request'; +import Logger from '../../utils/logger'; -export default ({ data }) => Following.findOne({ _id: data.following }).then(({ followerId, followeeId }) => { - const promisedFollower = User.findOne({ _id: followerId }); - const promisedFollowee = User.findOne({ _id: followeeId }); - - return Promise.all([ - // Increment following count - User.update(followerId, { - $inc: { - followingCount: 1 - } - }), - - promisedFollower.then(({ followingCount }) => FollowingLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: followingCount + 1 - })), - - // Increment followers count - User.update({ _id: followeeId }, { - $inc: { - followersCount: 1 - } - }), - - promisedFollowee.then(({ followersCount }) => FollowedLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: followersCount + 1 - })), - - // Notify - promisedFollowee.then(followee => followee.host === null ? - notify(followeeId, followerId, 'follow') : null), - - // Publish follow event - Promise.all([promisedFollower, promisedFollowee]).then(([follower, followee]) => { - let followerEvent; - let followeeEvent; - - if (isLocalUser(follower)) { - followerEvent = packUser(followee, follower) - .then(packed => event(follower._id, 'follow', packed)); - } - - if (isLocalUser(followee)) { - followeeEvent = packUser(follower, followee) - .then(packed => event(followee._id, 'followed', packed)); - } else if (isLocalUser(follower)) { - const rendered = render(follower, followee); - rendered['@context'] = context; - - followeeEvent = request(follower, followee.account.inbox, rendered); - } - - return Promise.all([followerEvent, followeeEvent]); - }) +export default async ({ data }) => { + const { followerId, followeeId } = await Following.findOne({ _id: data.following }); + const [follower, followee] = await Promise.all([ + User.findOne({ _id: followerId }), + User.findOne({ _id: followeeId }) ]); -}); + + if (isLocalUser(follower) && isRemoteUser(followee)) { + const rendered = render(follower, followee); + rendered['@context'] = context; + + await request(follower, followee.account.inbox, rendered); + } + + try { + await Promise.all([ + // Increment following count + User.update(followerId, { + $inc: { + followingCount: 1 + } + }), + + FollowingLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: follower.followingCount + 1 + }), + + // Increment followers count + User.update({ _id: followeeId }, { + $inc: { + followersCount: 1 + } + }), + + FollowedLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: followee.followersCount + 1 + }), + + // Publish follow event + isLocalUser(follower) && packUser(followee, follower) + .then(packed => event(follower._id, 'follow', packed)), + + isLocalUser(followee) && Promise.all([ + packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)), + + // Notify + isLocalUser(followee) && notify(followeeId, followerId, 'follow') + ]) + ]); + } catch (error) { + Logger.error(error.toString()); + } +}; From 86b1345c17482e188be2138f0247a4b204f2abc1 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Wed, 4 Apr 2018 22:05:12 +0900 Subject: [PATCH 03/15] Make HTTP request first in unfollow job --- src/processor/http/unfollow.ts | 81 ++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 37 deletions(-) diff --git a/src/processor/http/unfollow.ts b/src/processor/http/unfollow.ts index d3d5f2246..fbfd7b342 100644 --- a/src/processor/http/unfollow.ts +++ b/src/processor/http/unfollow.ts @@ -1,56 +1,63 @@ import FollowedLog from '../../models/followed-log'; import Following from '../../models/following'; import FollowingLog from '../../models/following-log'; -import User, { isRemoteUser, pack as packUser } from '../../models/user'; +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../models/user'; import stream from '../../publishers/stream'; import renderFollow from '../../remote/activitypub/renderer/follow'; import renderUndo from '../../remote/activitypub/renderer/undo'; import context from '../../remote/activitypub/renderer/context'; import request from '../../remote/request'; +import Logger from '../../utils/logger'; export default async ({ data }) => { - // Delete following - const following = await Following.findOneAndDelete({ _id: data.id }); + const following = await Following.findOne({ _id: data.id }); if (following === null) { return; } - const promisedFollower = User.findOne({ _id: following.followerId }); - const promisedFollowee = User.findOne({ _id: following.followeeId }); + const [follower, followee] = await Promise.all([ + User.findOne({ _id: following.followerId }), + User.findOne({ _id: following.followeeId }) + ]); - await Promise.all([ - // Decrement following count - User.update({ _id: following.followerId }, { $inc: { followingCount: -1 } }), - promisedFollower.then(({ followingCount }) => FollowingLog.insert({ - createdAt: new Date(), - userId: following.followerId, - count: followingCount - 1 - })), + if (isLocalUser(follower) && isRemoteUser(followee)) { + const undo = renderUndo(renderFollow(follower, followee)); + undo['@context'] = context; - // Decrement followers count - User.update({ _id: following.followeeId }, { $inc: { followersCount: -1 } }), - promisedFollowee.then(({ followersCount }) => FollowedLog.insert({ - createdAt: new Date(), - userId: following.followeeId, - count: followersCount - 1 - })), + await request(follower, followee.account.inbox, undo); + } + + try { + await Promise.all([ + // Delete following + Following.findOneAndDelete({ _id: data.id }), + + // Decrement following count + User.update({ _id: follower._id }, { $inc: { followingCount: -1 } }), + FollowingLog.insert({ + createdAt: new Date(), + userId: follower._id, + count: follower.followingCount - 1 + }), + + // Decrement followers count + User.update({ _id: followee._id }, { $inc: { followersCount: -1 } }), + FollowedLog.insert({ + createdAt: new Date(), + userId: followee._id, + count: followee.followersCount - 1 + }) + ]); + + if (isLocalUser(follower)) { + return; + } + + const promisedPackedUser = packUser(followee, follower); // Publish follow event - Promise.all([promisedFollower, promisedFollowee]).then(async ([follower, followee]) => { - if (isRemoteUser(follower)) { - return; - } - - const promisedPackedUser = packUser(followee, follower); - - if (isRemoteUser(followee)) { - const undo = renderUndo(renderFollow(follower, followee)); - undo['@context'] = context; - - await request(follower, followee.account.inbox, undo); - } - - stream(follower._id, 'unfollow', promisedPackedUser); - }) - ]); + stream(follower._id, 'unfollow', promisedPackedUser); + } catch (error) { + Logger.error(error.toString()); + } }; From d7c13b975f55c85b695b72a3ded3d5de97227414 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Wed, 4 Apr 2018 22:45:55 +0900 Subject: [PATCH 04/15] Retry HTTP requests --- src/following/distribute.ts | 42 +++++++++++++++++++ src/index.ts | 2 +- src/post/distribute.ts | 4 +- src/processor/http/perform-activitypub.ts | 7 ---- src/processor/index.ts | 18 -------- src/queue.ts | 10 ----- src/queue/index.ts | 38 +++++++++++++++++ .../processors}/db/delete-post-dependents.ts | 12 +++--- .../processors}/db/index.ts | 0 .../processors}/http/deliver-post.ts | 12 +++--- .../processors}/http/follow.ts | 20 ++++----- .../processors}/http/index.ts | 0 .../processors/http/perform-activitypub.ts | 7 ++++ .../processors}/http/process-inbox.ts | 10 ++--- .../processors}/http/report-github-failure.ts | 4 +- .../processors}/http/unfollow.ts | 20 ++++----- src/remote/activitypub/act/follow.ts | 4 +- src/remote/activitypub/act/undo/unfollow.ts | 4 +- src/remote/activitypub/delete/post.ts | 4 +- src/remote/activitypub/resolve-person.ts | 4 +- src/server/activitypub/inbox.ts | 4 +- src/server/api/endpoints/following/create.ts | 4 +- src/server/api/endpoints/following/delete.ts | 4 +- src/server/api/service/github.ts | 4 +- 24 files changed, 145 insertions(+), 93 deletions(-) create mode 100644 src/following/distribute.ts delete mode 100644 src/processor/http/perform-activitypub.ts delete mode 100644 src/processor/index.ts delete mode 100644 src/queue.ts create mode 100644 src/queue/index.ts rename src/{processor => queue/processors}/db/delete-post-dependents.ts (59%) rename src/{processor => queue/processors}/db/index.ts (100%) rename src/{processor => queue/processors}/http/deliver-post.ts (55%) rename src/{processor => queue/processors}/http/follow.ts (74%) rename src/{processor => queue/processors}/http/index.ts (100%) create mode 100644 src/queue/processors/http/perform-activitypub.ts rename src/{processor => queue/processors}/http/process-inbox.ts (76%) rename src/{processor => queue/processors}/http/report-github-failure.ts (85%) rename src/{processor => queue/processors}/http/unfollow.ts (71%) diff --git a/src/following/distribute.ts b/src/following/distribute.ts new file mode 100644 index 000000000..10ff98881 --- /dev/null +++ b/src/following/distribute.ts @@ -0,0 +1,42 @@ +import User, { pack as packUser } from '../models/user'; +import FollowingLog from '../models/following-log'; +import FollowedLog from '../models/followed-log'; +import event from '../publishers/stream'; +import notify from '../publishers/notify'; + +export default async (follower, followee) => Promise.all([ + // Increment following count + User.update(follower._id, { + $inc: { + followingCount: 1 + } + }), + + FollowingLog.insert({ + createdAt: new Date(), + userId: followee._id, + count: follower.followingCount + 1 + }), + + // Increment followers count + User.update({ _id: followee._id }, { + $inc: { + followersCount: 1 + } + }), + + FollowedLog.insert({ + createdAt: new Date(), + userId: follower._id, + count: followee.followersCount + 1 + }), + + followee.host === null && Promise.all([ + // Notify + notify(followee.id, follower.id, 'follow'), + + // Publish follow event + packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)) + ]) +]); diff --git a/src/index.ts b/src/index.ts index 29c4f3431..21fb2f553 100644 --- a/src/index.ts +++ b/src/index.ts @@ -99,7 +99,7 @@ async function workerMain(opt) { if (!opt['only-server']) { // start processor - require('./processor').default(); + require('./queue').process(); } // Send a 'ready' message to parent process diff --git a/src/post/distribute.ts b/src/post/distribute.ts index ad699d6b8..f748a620c 100644 --- a/src/post/distribute.ts +++ b/src/post/distribute.ts @@ -8,7 +8,7 @@ import User, { isLocalUser } from '../models/user'; import stream, { publishChannelStream } from '../publishers/stream'; import notify from '../publishers/notify'; import pushSw from '../publishers/push-sw'; -import queue from '../queue'; +import { createHttp } from '../queue'; import watch from './watch'; export default async (user, mentions, post) => { @@ -84,7 +84,7 @@ export default async (user, mentions, post) => { } return new Promise((resolve, reject) => { - queue.create('http', { + createHttp({ type: 'deliverPost', fromId: user._id, toId: following.followerId, diff --git a/src/processor/http/perform-activitypub.ts b/src/processor/http/perform-activitypub.ts deleted file mode 100644 index 963e532fe..000000000 --- a/src/processor/http/perform-activitypub.ts +++ /dev/null @@ -1,7 +0,0 @@ -import User from '../../models/user'; -import act from '../../remote/activitypub/act'; -import Resolver from '../../remote/activitypub/resolver'; - -export default ({ data }) => User.findOne({ _id: data.actor }) - .then(actor => act(new Resolver(), actor, data.outbox)) - .then(Promise.all); diff --git a/src/processor/index.ts b/src/processor/index.ts deleted file mode 100644 index 172048dda..000000000 --- a/src/processor/index.ts +++ /dev/null @@ -1,18 +0,0 @@ -import queue from '../queue'; -import db from './db'; -import http from './http'; - -export default () => { - queue.process('db', db); - - /* - 256 is the default concurrency limit of Mozilla Firefox and Google - Chromium. - - a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google - https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff - Network.http.max-connections - MozillaZine Knowledge Base - http://kb.mozillazine.org/Network.http.max-connections - */ - queue.process('http', 256, http); -}; diff --git a/src/queue.ts b/src/queue.ts deleted file mode 100644 index 08ea13c2a..000000000 --- a/src/queue.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { createQueue } from 'kue'; -import config from './config'; - -export default createQueue({ - redis: { - port: config.redis.port, - host: config.redis.host, - auth: config.redis.pass - } -}); diff --git a/src/queue/index.ts b/src/queue/index.ts new file mode 100644 index 000000000..f90754a56 --- /dev/null +++ b/src/queue/index.ts @@ -0,0 +1,38 @@ +import { createQueue } from 'kue'; +import config from '../config'; +import db from './processors/db'; +import http from './processors/http'; + +const queue = createQueue({ + redis: { + port: config.redis.port, + host: config.redis.host, + auth: config.redis.pass + } +}); + +export function createHttp(data) { + return queue + .create('http', data) + .attempts(16) + .backoff({ delay: 16384, type: 'exponential' }); +} + +export function createDb(data) { + return queue.create('db', data); +} + +export function process() { + queue.process('db', db); + + /* + 256 is the default concurrency limit of Mozilla Firefox and Google + Chromium. + + a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google + https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff + Network.http.max-connections - MozillaZine Knowledge Base + http://kb.mozillazine.org/Network.http.max-connections + */ + queue.process('http', 256, http); +} diff --git a/src/processor/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts similarity index 59% rename from src/processor/db/delete-post-dependents.ts rename to src/queue/processors/db/delete-post-dependents.ts index 879c41ec9..6de21eb05 100644 --- a/src/processor/db/delete-post-dependents.ts +++ b/src/queue/processors/db/delete-post-dependents.ts @@ -1,9 +1,9 @@ -import Favorite from '../../models/favorite'; -import Notification from '../../models/notification'; -import PollVote from '../../models/poll-vote'; -import PostReaction from '../../models/post-reaction'; -import PostWatching from '../../models/post-watching'; -import Post from '../../models/post'; +import Favorite from '../../../models/favorite'; +import Notification from '../../../models/notification'; +import PollVote from '../../../models/poll-vote'; +import PostReaction from '../../../models/post-reaction'; +import PostWatching from '../../../models/post-watching'; +import Post from '../../../models/post'; export default async ({ data }) => Promise.all([ Favorite.remove({ postId: data._id }), diff --git a/src/processor/db/index.ts b/src/queue/processors/db/index.ts similarity index 100% rename from src/processor/db/index.ts rename to src/queue/processors/db/index.ts diff --git a/src/processor/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts similarity index 55% rename from src/processor/http/deliver-post.ts rename to src/queue/processors/http/deliver-post.ts index 48ad4f95a..e743fc5f6 100644 --- a/src/processor/http/deliver-post.ts +++ b/src/queue/processors/http/deliver-post.ts @@ -1,9 +1,9 @@ -import Post from '../../models/post'; -import User, { IRemoteUser } from '../../models/user'; -import context from '../../remote/activitypub/renderer/context'; -import renderCreate from '../../remote/activitypub/renderer/create'; -import renderNote from '../../remote/activitypub/renderer/note'; -import request from '../../remote/request'; +import Post from '../../../models/post'; +import User, { IRemoteUser } from '../../../models/user'; +import context from '../../../remote/activitypub/renderer/context'; +import renderCreate from '../../../remote/activitypub/renderer/create'; +import renderNote from '../../../remote/activitypub/renderer/note'; +import request from '../../../remote/request'; export default async ({ data }) => { const promisedTo = User.findOne({ _id: data.toId }) as Promise; diff --git a/src/processor/http/follow.ts b/src/queue/processors/http/follow.ts similarity index 74% rename from src/processor/http/follow.ts rename to src/queue/processors/http/follow.ts index ed36fa18d..4cb72828e 100644 --- a/src/processor/http/follow.ts +++ b/src/queue/processors/http/follow.ts @@ -1,13 +1,13 @@ -import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../models/user'; -import Following from '../../models/following'; -import FollowingLog from '../../models/following-log'; -import FollowedLog from '../../models/followed-log'; -import event from '../../publishers/stream'; -import notify from '../../publishers/notify'; -import context from '../../remote/activitypub/renderer/context'; -import render from '../../remote/activitypub/renderer/follow'; -import request from '../../remote/request'; -import Logger from '../../utils/logger'; +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; +import Following from '../../../models/following'; +import FollowingLog from '../../../models/following-log'; +import FollowedLog from '../../../models/followed-log'; +import event from '../../../publishers/stream'; +import notify from '../../../publishers/notify'; +import context from '../../../remote/activitypub/renderer/context'; +import render from '../../../remote/activitypub/renderer/follow'; +import request from '../../../remote/request'; +import Logger from '../../../utils/logger'; export default async ({ data }) => { const { followerId, followeeId } = await Following.findOne({ _id: data.following }); diff --git a/src/processor/http/index.ts b/src/queue/processors/http/index.ts similarity index 100% rename from src/processor/http/index.ts rename to src/queue/processors/http/index.ts diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts new file mode 100644 index 000000000..7b84400d5 --- /dev/null +++ b/src/queue/processors/http/perform-activitypub.ts @@ -0,0 +1,7 @@ +import User from '../../../models/user'; +import act from '../../../remote/activitypub/act'; +import Resolver from '../../../remote/activitypub/resolver'; + +export default ({ data }) => User.findOne({ _id: data.actor }) + .then(actor => act(new Resolver(), actor, data.outbox)) + .then(Promise.all); diff --git a/src/processor/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts similarity index 76% rename from src/processor/http/process-inbox.ts rename to src/queue/processors/http/process-inbox.ts index f102f8d6b..de1dbd2f9 100644 --- a/src/processor/http/process-inbox.ts +++ b/src/queue/processors/http/process-inbox.ts @@ -1,9 +1,9 @@ import { verifySignature } from 'http-signature'; -import parseAcct from '../../acct/parse'; -import User, { IRemoteUser } from '../../models/user'; -import act from '../../remote/activitypub/act'; -import resolvePerson from '../../remote/activitypub/resolve-person'; -import Resolver from '../../remote/activitypub/resolver'; +import parseAcct from '../../../acct/parse'; +import User, { IRemoteUser } from '../../../models/user'; +import act from '../../../remote/activitypub/act'; +import resolvePerson from '../../../remote/activitypub/resolve-person'; +import Resolver from '../../../remote/activitypub/resolver'; export default async ({ data }): Promise => { const keyIdLower = data.signature.keyId.toLowerCase(); diff --git a/src/processor/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts similarity index 85% rename from src/processor/http/report-github-failure.ts rename to src/queue/processors/http/report-github-failure.ts index 4f6f5ccee..21683ba3c 100644 --- a/src/processor/http/report-github-failure.ts +++ b/src/queue/processors/http/report-github-failure.ts @@ -1,6 +1,6 @@ import * as request from 'request-promise-native'; -import User from '../../models/user'; -const createPost = require('../../server/api/endpoints/posts/create'); +import User from '../../../models/user'; +const createPost = require('../../../server/api/endpoints/posts/create'); export default async ({ data }) => { const asyncBot = User.findOne({ _id: data.userId }); diff --git a/src/processor/http/unfollow.ts b/src/queue/processors/http/unfollow.ts similarity index 71% rename from src/processor/http/unfollow.ts rename to src/queue/processors/http/unfollow.ts index fbfd7b342..801a3612a 100644 --- a/src/processor/http/unfollow.ts +++ b/src/queue/processors/http/unfollow.ts @@ -1,13 +1,13 @@ -import FollowedLog from '../../models/followed-log'; -import Following from '../../models/following'; -import FollowingLog from '../../models/following-log'; -import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../models/user'; -import stream from '../../publishers/stream'; -import renderFollow from '../../remote/activitypub/renderer/follow'; -import renderUndo from '../../remote/activitypub/renderer/undo'; -import context from '../../remote/activitypub/renderer/context'; -import request from '../../remote/request'; -import Logger from '../../utils/logger'; +import FollowedLog from '../../../models/followed-log'; +import Following from '../../../models/following'; +import FollowingLog from '../../../models/following-log'; +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; +import stream from '../../../publishers/stream'; +import renderFollow from '../../../remote/activitypub/renderer/follow'; +import renderUndo from '../../../remote/activitypub/renderer/undo'; +import context from '../../../remote/activitypub/renderer/context'; +import request from '../../../remote/request'; +import Logger from '../../../utils/logger'; export default async ({ data }) => { const following = await Following.findOne({ _id: data.id }); diff --git a/src/remote/activitypub/act/follow.ts b/src/remote/activitypub/act/follow.ts index 23fa41df8..222a257e1 100644 --- a/src/remote/activitypub/act/follow.ts +++ b/src/remote/activitypub/act/follow.ts @@ -3,7 +3,7 @@ import parseAcct from '../../../acct/parse'; import Following, { IFollowing } from '../../../models/following'; import User from '../../../models/user'; import config from '../../../config'; -import queue from '../../../queue'; +import { createHttp } from '../../../queue'; import context from '../renderer/context'; import renderAccept from '../renderer/accept'; import request from '../../request'; @@ -44,7 +44,7 @@ export default async (resolver: Resolver, actor, activity, distribute) => { followerId: actor._id, followeeId: followee._id }).then(following => new Promise((resolve, reject) => { - queue.create('http', { + createHttp({ type: 'follow', following: following._id }).save(error => { diff --git a/src/remote/activitypub/act/undo/unfollow.ts b/src/remote/activitypub/act/undo/unfollow.ts index c17e06e8a..4f15d9a3e 100644 --- a/src/remote/activitypub/act/undo/unfollow.ts +++ b/src/remote/activitypub/act/undo/unfollow.ts @@ -1,7 +1,7 @@ -import queue from '../../../../queue'; +import { createHttp } from '../../../../queue'; export default ({ $id }) => new Promise((resolve, reject) => { - queue.create('http', { type: 'unfollow', id: $id }).save(error => { + createHttp({ type: 'unfollow', id: $id }).save(error => { if (error) { reject(error); } else { diff --git a/src/remote/activitypub/delete/post.ts b/src/remote/activitypub/delete/post.ts index f6c816647..59ae8c2b9 100644 --- a/src/remote/activitypub/delete/post.ts +++ b/src/remote/activitypub/delete/post.ts @@ -1,10 +1,10 @@ import Post from '../../../models/post'; -import queue from '../../../queue'; +import { createDb } from '../../../queue'; export default async ({ $id }) => { const promisedDeletion = Post.findOneAndDelete({ _id: $id }); - await new Promise((resolve, reject) => queue.create('db', { + await new Promise((resolve, reject) => createDb({ type: 'deletePostDependents', id: $id }).delay(65536).save(error => error ? reject(error) : resolve())); diff --git a/src/remote/activitypub/resolve-person.ts b/src/remote/activitypub/resolve-person.ts index 59be65908..2cf3ad32d 100644 --- a/src/remote/activitypub/resolve-person.ts +++ b/src/remote/activitypub/resolve-person.ts @@ -1,7 +1,7 @@ import { JSDOM } from 'jsdom'; import { toUnicode } from 'punycode'; import User, { validateUsername, isValidName, isValidDescription } from '../../models/user'; -import queue from '../../queue'; +import { createHttp } from '../../queue'; import webFinger from '../webfinger'; import create from './create'; import Resolver from './resolver'; @@ -69,7 +69,7 @@ export default async (value, verifier?: string) => { }, }); - queue.create('http', { + createHttp({ type: 'performActivityPub', actor: user._id, outbox diff --git a/src/server/activitypub/inbox.ts b/src/server/activitypub/inbox.ts index 5de843385..0907823b2 100644 --- a/src/server/activitypub/inbox.ts +++ b/src/server/activitypub/inbox.ts @@ -1,7 +1,7 @@ import * as bodyParser from 'body-parser'; import * as express from 'express'; import { parseRequest } from 'http-signature'; -import queue from '../../queue'; +import { createHttp } from '../../queue'; const app = express(); @@ -22,7 +22,7 @@ app.post('/@:user/inbox', bodyParser.json({ return res.sendStatus(401); } - queue.create('http', { + createHttp({ type: 'processInbox', inbox: req.body, signature, diff --git a/src/server/api/endpoints/following/create.ts b/src/server/api/endpoints/following/create.ts index e56859521..9ccbe2017 100644 --- a/src/server/api/endpoints/following/create.ts +++ b/src/server/api/endpoints/following/create.ts @@ -4,7 +4,7 @@ import $ from 'cafy'; import User from '../../../../models/user'; import Following from '../../../../models/following'; -import queue from '../../../../queue'; +import { createHttp } from '../../../../queue'; /** * Follow a user @@ -56,7 +56,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => { followeeId: followee._id }); - queue.create('http', { type: 'follow', following: _id }).save(); + createHttp({ type: 'follow', following: _id }).save(); // Send response res(); diff --git a/src/server/api/endpoints/following/delete.ts b/src/server/api/endpoints/following/delete.ts index bf21bf0cb..0684b8750 100644 --- a/src/server/api/endpoints/following/delete.ts +++ b/src/server/api/endpoints/following/delete.ts @@ -4,7 +4,7 @@ import $ from 'cafy'; import User from '../../../../models/user'; import Following from '../../../../models/following'; -import queue from '../../../../queue'; +import { createHttp } from '../../../../queue'; /** * Unfollow a user @@ -49,7 +49,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => { return rej('already not following'); } - queue.create('http', { + createHttp({ type: 'unfollow', id: exist._id }).save(error => { diff --git a/src/server/api/service/github.ts b/src/server/api/service/github.ts index 4fd59c2a9..5fc4a92f5 100644 --- a/src/server/api/service/github.ts +++ b/src/server/api/service/github.ts @@ -3,7 +3,7 @@ import * as express from 'express'; //const crypto = require('crypto'); import User from '../../../models/user'; import config from '../../../config'; -import queue from '../../../queue'; +import { createHttp } from '../../../queue'; module.exports = async (app: express.Application) => { if (config.github_bot == null) return; @@ -42,7 +42,7 @@ module.exports = async (app: express.Application) => { const commit = event.commit; const parent = commit.parents[0]; - queue.create('http', { + createHttp({ type: 'gitHubFailureReport', userId: bot._id, parentUrl: parent.url, From e330ac1934516807757afe2d2760fa21b27006e6 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Thu, 5 Apr 2018 01:04:44 +0900 Subject: [PATCH 05/15] Let unhandled rejection handler handle rejections in jobs --- .../processors/db/delete-post-dependents.ts | 4 +- src/queue/processors/db/index.ts | 2 +- src/queue/processors/http/deliver-post.ts | 28 ++++--- src/queue/processors/http/follow.ts | 79 +++++++++---------- src/queue/processors/http/index.ts | 2 +- .../processors/http/perform-activitypub.ts | 5 +- src/queue/processors/http/process-inbox.ts | 55 +++++++------ .../processors/http/report-github-failure.ts | 39 +++++---- src/queue/processors/http/unfollow.ts | 31 +++++--- 9 files changed, 134 insertions(+), 111 deletions(-) diff --git a/src/queue/processors/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts index 6de21eb05..fb6617e95 100644 --- a/src/queue/processors/db/delete-post-dependents.ts +++ b/src/queue/processors/db/delete-post-dependents.ts @@ -5,7 +5,7 @@ import PostReaction from '../../../models/post-reaction'; import PostWatching from '../../../models/post-watching'; import Post from '../../../models/post'; -export default async ({ data }) => Promise.all([ +export default ({ data }, done) => Promise.all([ Favorite.remove({ postId: data._id }), Notification.remove({ postId: data._id }), PollVote.remove({ postId: data._id }), @@ -19,4 +19,4 @@ export default async ({ data }) => Promise.all([ }), Post.remove({ repostId: data._id }) ])) -]); +]).then(() => done(), done); diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts index 75838c099..468ec442a 100644 --- a/src/queue/processors/db/index.ts +++ b/src/queue/processors/db/index.ts @@ -4,4 +4,4 @@ const handlers = { deletePostDependents }; -export default (job, done) => handlers[job.data.type](job).then(() => done(), done); +export default (job, done) => handlers[job.data.type](job, done); diff --git a/src/queue/processors/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts index e743fc5f6..8107c8bf7 100644 --- a/src/queue/processors/http/deliver-post.ts +++ b/src/queue/processors/http/deliver-post.ts @@ -5,17 +5,23 @@ import renderCreate from '../../../remote/activitypub/renderer/create'; import renderNote from '../../../remote/activitypub/renderer/note'; import request from '../../../remote/request'; -export default async ({ data }) => { - const promisedTo = User.findOne({ _id: data.toId }) as Promise; - const [from, post] = await Promise.all([ - User.findOne({ _id: data.fromId }), - Post.findOne({ _id: data.postId }) - ]); - const note = await renderNote(from, post); - const to = await promisedTo; - const create = renderCreate(note); +export default async ({ data }, done) => { + try { + const promisedTo = User.findOne({ _id: data.toId }) as Promise; + const [from, post] = await Promise.all([ + User.findOne({ _id: data.fromId }), + Post.findOne({ _id: data.postId }) + ]); + const note = await renderNote(from, post); + const to = await promisedTo; + const create = renderCreate(note); - create['@context'] = context; + create['@context'] = context; - return request(from, to.account.inbox, create); + await request(from, to.account.inbox, create); + } catch (error) { + done(error); + } + + done(); }; diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts index 4cb72828e..ba1cc3118 100644 --- a/src/queue/processors/http/follow.ts +++ b/src/queue/processors/http/follow.ts @@ -7,10 +7,8 @@ import notify from '../../../publishers/notify'; import context from '../../../remote/activitypub/renderer/context'; import render from '../../../remote/activitypub/renderer/follow'; import request from '../../../remote/request'; -import Logger from '../../../utils/logger'; -export default async ({ data }) => { - const { followerId, followeeId } = await Following.findOne({ _id: data.following }); +export default ({ data }, done) => Following.findOne({ _id: data.following }).then(async ({ followerId, followeeId }) => { const [follower, followee] = await Promise.all([ User.findOne({ _id: followerId }), User.findOne({ _id: followeeId }) @@ -23,47 +21,46 @@ export default async ({ data }) => { await request(follower, followee.account.inbox, rendered); } - try { - await Promise.all([ - // Increment following count - User.update(followerId, { - $inc: { - followingCount: 1 - } - }), + return [follower, followee]; +}).then(([follower, followee]) => Promise.all([ + // Increment following count + User.update(follower._id, { + $inc: { + followingCount: 1 + } + }), - FollowingLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: follower.followingCount + 1 - }), + FollowingLog.insert({ + createdAt: data.following.createdAt, + userId: follower._id, + count: follower.followingCount + 1 + }), - // Increment followers count - User.update({ _id: followeeId }, { - $inc: { - followersCount: 1 - } - }), + // Increment followers count + User.update({ _id: followee._id }, { + $inc: { + followersCount: 1 + } + }), - FollowedLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: followee.followersCount + 1 - }), + FollowedLog.insert({ + createdAt: data.following.createdAt, + userId: follower._id, + count: followee.followersCount + 1 + }), - // Publish follow event - isLocalUser(follower) && packUser(followee, follower) - .then(packed => event(follower._id, 'follow', packed)), + // Publish follow event + isLocalUser(follower) && packUser(followee, follower) + .then(packed => event(follower._id, 'follow', packed)), - isLocalUser(followee) && Promise.all([ - packUser(follower, followee) - .then(packed => event(followee._id, 'followed', packed)), + isLocalUser(followee) && Promise.all([ + packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)), - // Notify - isLocalUser(followee) && notify(followeeId, followerId, 'follow') - ]) - ]); - } catch (error) { - Logger.error(error.toString()); - } -}; + // Notify + isLocalUser(followee) && notify(followee._id, follower._id, 'follow') + ]) +]).then(() => done(), error => { + done(); + throw error; +}), done); diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts index 8f9aa717c..0ea79305c 100644 --- a/src/queue/processors/http/index.ts +++ b/src/queue/processors/http/index.ts @@ -14,4 +14,4 @@ const handlers = { unfollow }; -export default (job, done) => handlers[job.data.type](job).then(() => done(), done); +export default (job, done) => handlers[job.data.type](job, done); diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts index 7b84400d5..ae70c0f0b 100644 --- a/src/queue/processors/http/perform-activitypub.ts +++ b/src/queue/processors/http/perform-activitypub.ts @@ -2,6 +2,7 @@ import User from '../../../models/user'; import act from '../../../remote/activitypub/act'; import Resolver from '../../../remote/activitypub/resolver'; -export default ({ data }) => User.findOne({ _id: data.actor }) +export default ({ data }, done) => User.findOne({ _id: data.actor }) .then(actor => act(new Resolver(), actor, data.outbox)) - .then(Promise.all); + .then(Promise.all) + .then(() => done(), done); diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts index de1dbd2f9..88fbb9737 100644 --- a/src/queue/processors/http/process-inbox.ts +++ b/src/queue/processors/http/process-inbox.ts @@ -5,35 +5,40 @@ import act from '../../../remote/activitypub/act'; import resolvePerson from '../../../remote/activitypub/resolve-person'; import Resolver from '../../../remote/activitypub/resolver'; -export default async ({ data }): Promise => { - const keyIdLower = data.signature.keyId.toLowerCase(); - let user; +export default async ({ data }, done) => { + try { + const keyIdLower = data.signature.keyId.toLowerCase(); + let user; - if (keyIdLower.startsWith('acct:')) { - const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); - if (host === null) { - throw 'request was made by local user'; + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + done(); + return; + } + + user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; + } else { + user = await User.findOne({ + host: { $ne: null }, + 'account.publicKey.id': data.signature.keyId + }) as IRemoteUser; + + if (user === null) { + user = await resolvePerson(data.signature.keyId); + } } - user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; - } else { - user = await User.findOne({ - host: { $ne: null }, - 'account.publicKey.id': data.signature.keyId - }) as IRemoteUser; - - if (user === null) { - user = await resolvePerson(data.signature.keyId); + if (user === null || !verifySignature(data.signature, user.account.publicKey.publicKeyPem)) { + done(); + return; } + + await Promise.all(await act(new Resolver(), user, data.inbox, true)); + } catch (error) { + done(error); + return; } - if (user === null) { - throw 'failed to resolve user'; - } - - if (!verifySignature(data.signature, user.account.publicKey.publicKeyPem)) { - throw 'signature verification failed'; - } - - await Promise.all(await act(new Resolver(), user, data.inbox, true)); + done(); }; diff --git a/src/queue/processors/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts index 21683ba3c..af9659bda 100644 --- a/src/queue/processors/http/report-github-failure.ts +++ b/src/queue/processors/http/report-github-failure.ts @@ -2,23 +2,30 @@ import * as request from 'request-promise-native'; import User from '../../../models/user'; const createPost = require('../../../server/api/endpoints/posts/create'); -export default async ({ data }) => { - const asyncBot = User.findOne({ _id: data.userId }); +export default async ({ data }, done) => { + try { + const asyncBot = User.findOne({ _id: data.userId }); - // Fetch parent status - const parentStatuses = await request({ - url: `${data.parentUrl}/statuses`, - headers: { - 'User-Agent': 'misskey' - }, - json: true - }); + // Fetch parent status + const parentStatuses = await request({ + url: `${data.parentUrl}/statuses`, + headers: { + 'User-Agent': 'misskey' + }, + json: true + }); - const parentState = parentStatuses[0].state; - const stillFailed = parentState == 'failure' || parentState == 'error'; - const text = stillFailed ? - `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : - `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; + const parentState = parentStatuses[0].state; + const stillFailed = parentState == 'failure' || parentState == 'error'; + const text = stillFailed ? + `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : + `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; - createPost({ text }, await asyncBot); + createPost({ text }, await asyncBot); + } catch (error) { + done(error); + return; + } + + done(); }; diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts index 801a3612a..dc50e946c 100644 --- a/src/queue/processors/http/unfollow.ts +++ b/src/queue/processors/http/unfollow.ts @@ -7,24 +7,31 @@ import renderFollow from '../../../remote/activitypub/renderer/follow'; import renderUndo from '../../../remote/activitypub/renderer/undo'; import context from '../../../remote/activitypub/renderer/context'; import request from '../../../remote/request'; -import Logger from '../../../utils/logger'; -export default async ({ data }) => { +export default async ({ data }, done) => { const following = await Following.findOne({ _id: data.id }); if (following === null) { + done(); return; } - const [follower, followee] = await Promise.all([ - User.findOne({ _id: following.followerId }), - User.findOne({ _id: following.followeeId }) - ]); + let follower, followee; - if (isLocalUser(follower) && isRemoteUser(followee)) { - const undo = renderUndo(renderFollow(follower, followee)); - undo['@context'] = context; + try { + [follower, followee] = await Promise.all([ + User.findOne({ _id: following.followerId }), + User.findOne({ _id: following.followeeId }) + ]); - await request(follower, followee.account.inbox, undo); + if (isLocalUser(follower) && isRemoteUser(followee)) { + const undo = renderUndo(renderFollow(follower, followee)); + undo['@context'] = context; + + await request(follower, followee.account.inbox, undo); + } + } catch (error) { + done(error); + return; } try { @@ -57,7 +64,7 @@ export default async ({ data }) => { // Publish follow event stream(follower._id, 'unfollow', promisedPackedUser); - } catch (error) { - Logger.error(error.toString()); + } finally { + done(); } }; From a5715ecc1b73d3e3a950c392fa3a466dee810248 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Thu, 5 Apr 2018 01:07:55 +0900 Subject: [PATCH 06/15] Do not declare two variables in a statement --- src/queue/processors/http/unfollow.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts index dc50e946c..d62eb280d 100644 --- a/src/queue/processors/http/unfollow.ts +++ b/src/queue/processors/http/unfollow.ts @@ -15,7 +15,8 @@ export default async ({ data }, done) => { return; } - let follower, followee; + let follower; + let followee; try { [follower, followee] = await Promise.all([ From 168b0730b46fd283b900b553dd2eede2aa4c7dac Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Thu, 5 Apr 2018 01:24:39 +0900 Subject: [PATCH 07/15] Implement Mention object --- src/post/create.ts | 14 ++------------ src/queue/processors/http/process-inbox.ts | 2 +- src/remote/activitypub/create.ts | 12 +++++++++++- src/remote/activitypub/resolve-person.ts | 5 ++--- src/remote/resolve-user.ts | 3 ++- src/server/api/endpoints/posts/create.ts | 10 +++++++--- 6 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/post/create.ts b/src/post/create.ts index ecea37382..4ad1503e0 100644 --- a/src/post/create.ts +++ b/src/post/create.ts @@ -1,8 +1,6 @@ -import parseAcct from '../acct/parse'; import Post from '../models/post'; -import User from '../models/user'; -export default async (post, reply, repost, atMentions) => { +export default async (post, reply, repost, mentions) => { post.mentions = []; function addMention(mentionee) { @@ -36,15 +34,7 @@ export default async (post, reply, repost, atMentions) => { post._repost = null; } - await Promise.all(atMentions.map(async mention => { - // Fetch mentioned user - // SELECT _id - const { _id } = await User - .findOne(parseAcct(mention), { _id: true }); - - // Add mention - addMention(_id); - })); + await Promise.all(mentions.map(({ _id }) => addMention(_id))); return Post.insert(post); }; diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts index 88fbb9737..7eeaa19f8 100644 --- a/src/queue/processors/http/process-inbox.ts +++ b/src/queue/processors/http/process-inbox.ts @@ -25,7 +25,7 @@ export default async ({ data }, done) => { }) as IRemoteUser; if (user === null) { - user = await resolvePerson(data.signature.keyId); + user = await resolvePerson(new Resolver(), data.signature.keyId); } } diff --git a/src/remote/activitypub/create.ts b/src/remote/activitypub/create.ts index 97c72860f..710d56fd3 100644 --- a/src/remote/activitypub/create.ts +++ b/src/remote/activitypub/create.ts @@ -7,6 +7,7 @@ import { IRemoteUser } from '../../models/user'; import uploadFromUrl from '../../drive/upload-from-url'; import createPost from '../../post/create'; import distributePost from '../../post/distribute'; +import resolvePerson from './resolve-person'; import Resolver from './resolver'; const createDOMPurify = require('dompurify'); @@ -53,6 +54,15 @@ class Creator { .map(({ object }) => object.$id); const { window } = new JSDOM(note.content); + const mentions = []; + + for (const { href, type } of note.tags) { + switch (type) { + case 'Mention': + mentions.push(resolvePerson(resolver, href)); + break; + } + } const inserted = await createPost({ channelId: undefined, @@ -69,7 +79,7 @@ class Creator { viaMobile: false, geo: undefined, uri: note.id - }, null, null, []); + }, null, null, await Promise.all(mentions)); const promises = []; diff --git a/src/remote/activitypub/resolve-person.ts b/src/remote/activitypub/resolve-person.ts index 2cf3ad32d..7ed01e322 100644 --- a/src/remote/activitypub/resolve-person.ts +++ b/src/remote/activitypub/resolve-person.ts @@ -4,14 +4,13 @@ import User, { validateUsername, isValidName, isValidDescription } from '../../m import { createHttp } from '../../queue'; import webFinger from '../webfinger'; import create from './create'; -import Resolver from './resolver'; async function isCollection(collection) { return ['Collection', 'OrderedCollection'].includes(collection.type); } -export default async (value, verifier?: string) => { - const { resolver, object } = await new Resolver().resolveOne(value); +export default async (parentResolver, value, verifier?: string) => { + const { resolver, object } = parentResolver.resolveOne(value); if ( object === null || diff --git a/src/remote/resolve-user.ts b/src/remote/resolve-user.ts index 48219e8cb..097ed6673 100644 --- a/src/remote/resolve-user.ts +++ b/src/remote/resolve-user.ts @@ -1,6 +1,7 @@ import { toUnicode, toASCII } from 'punycode'; import User from '../models/user'; import resolvePerson from './activitypub/resolve-person'; +import Resolver from './activitypub/resolver'; import webFinger from './webfinger'; export default async (username, host, option) => { @@ -19,7 +20,7 @@ export default async (username, host, option) => { throw new Error(); } - user = await resolvePerson(self.href, acctLower); + user = await resolvePerson(new Resolver(), self.href, acctLower); } return user; diff --git a/src/server/api/endpoints/posts/create.ts b/src/server/api/endpoints/posts/create.ts index 03af7ee76..47897626f 100644 --- a/src/server/api/endpoints/posts/create.ts +++ b/src/server/api/endpoints/posts/create.ts @@ -3,12 +3,13 @@ */ import $ from 'cafy'; import deepEqual = require('deep-equal'); +import parseAcct from '../../../../acct/parse'; import renderAcct from '../../../../acct/render'; import config from '../../../../config'; import html from '../../../../text/html'; import parse from '../../../../text/parse'; import Post, { IPost, isValidText, isValidCw } from '../../../../models/post'; -import { ILocalUser } from '../../../../models/user'; +import User, { ILocalUser } from '../../../../models/user'; import Channel, { IChannel } from '../../../../models/channel'; import DriveFile from '../../../../models/drive-file'; import create from '../../../../post/create'; @@ -267,7 +268,10 @@ module.exports = (params, user: ILocalUser, app) => new Promise(async (res, rej) .filter(t => t.type == 'mention') .map(renderAcct) // Drop dupulicates - .filter((v, i, s) => s.indexOf(v) == i); + .filter((v, i, s) => s.indexOf(v) == i) + // Fetch mentioned user + // SELECT _id + .map(mention => User.findOne(parseAcct(mention), { _id: true })); } // 投稿を作成 @@ -286,7 +290,7 @@ module.exports = (params, user: ILocalUser, app) => new Promise(async (res, rej) viaMobile: viaMobile, visibility, geo - }, reply, repost, atMentions); + }, reply, repost, await Promise.all(atMentions)); const postObj = await distribute(user, post.mentions, post); From f44a7389e2b67e524c953843fcb4e007349c76c2 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Thu, 5 Apr 2018 01:29:56 +0900 Subject: [PATCH 08/15] Implement Hashtag object --- src/remote/activitypub/create.ts | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/remote/activitypub/create.ts b/src/remote/activitypub/create.ts index 710d56fd3..31e9dba86 100644 --- a/src/remote/activitypub/create.ts +++ b/src/remote/activitypub/create.ts @@ -55,9 +55,16 @@ class Creator { const { window } = new JSDOM(note.content); const mentions = []; + const tags = []; - for (const { href, type } of note.tags) { + for (const { href, name, type } of note.tags) { switch (type) { + case 'Hashtag': + if (name.startsWith('#')) { + tags.push(name.slice(1)); + } + break; + case 'Mention': mentions.push(resolvePerson(resolver, href)); break; @@ -78,7 +85,8 @@ class Creator { appId: null, viaMobile: false, geo: undefined, - uri: note.id + uri: note.id, + tags }, null, null, await Promise.all(mentions)); const promises = []; From ced6b9a76f71e95a376dc4cb37b6b5617a0321d9 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Thu, 5 Apr 2018 01:57:18 +0900 Subject: [PATCH 09/15] Handle inReplyTo property --- src/remote/activitypub/create.ts | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/remote/activitypub/create.ts b/src/remote/activitypub/create.ts index 31e9dba86..3bc0c66f3 100644 --- a/src/remote/activitypub/create.ts +++ b/src/remote/activitypub/create.ts @@ -48,11 +48,6 @@ class Creator { throw new Error(); } - const mediaIds = 'attachment' in note && - (await Promise.all(await this.create(resolver, note.attachment))) - .filter(media => media !== null && media.object.$ref === 'driveFiles.files') - .map(({ object }) => object.$id); - const { window } = new JSDOM(note.content); const mentions = []; const tags = []; @@ -71,13 +66,27 @@ class Creator { } } + const [mediaIds, reply] = await Promise.all([ + 'attachment' in note && this.create(resolver, note.attachment) + .then(collection => Promise.all(collection)) + .then(collection => collection + .filter(media => media !== null && media.object.$ref === 'driveFiles.files') + .map(({ object }: IResult) => object.$id)), + + 'inReplyTo' in note && this.create(resolver, note.inReplyTo) + .then(collection => Promise.all(collection.map(promise => promise.then(result => { + if (result !== null && result.object.$ref === 'posts') { + throw result.object; + } + }, () => { })))) + .then(() => null, ({ $id }) => Post.findOne({ _id: $id })) + ]); + const inserted = await createPost({ channelId: undefined, index: undefined, createdAt: new Date(note.published), mediaIds, - replyId: undefined, - repostId: undefined, poll: undefined, text: window.document.body.textContent, textHtml: note.content && createDOMPurify(window).sanitize(note.content), @@ -87,7 +96,7 @@ class Creator { geo: undefined, uri: note.id, tags - }, null, null, await Promise.all(mentions)); + }, reply, null, await Promise.all(mentions)); const promises = []; From 068c0b4cceaa9461cc87a6d44abebdfc429d1861 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Thu, 5 Apr 2018 21:16:14 +0900 Subject: [PATCH 10/15] Add a missing await expression --- src/remote/activitypub/resolve-person.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/remote/activitypub/resolve-person.ts b/src/remote/activitypub/resolve-person.ts index 7ed01e322..a7c0020dd 100644 --- a/src/remote/activitypub/resolve-person.ts +++ b/src/remote/activitypub/resolve-person.ts @@ -10,7 +10,7 @@ async function isCollection(collection) { } export default async (parentResolver, value, verifier?: string) => { - const { resolver, object } = parentResolver.resolveOne(value); + const { resolver, object } = await parentResolver.resolveOne(value); if ( object === null || From f0e8e6392b5ef99488ea0bbecbf9029e30ef0cfa Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Fri, 6 Apr 2018 01:36:34 +0900 Subject: [PATCH 11/15] Allow name property of user to be null --- src/client/app/ch/tags/channel.tag | 10 ++++++-- .../common/scripts/compose-notification.ts | 13 ++++++----- .../common/views/components/autocomplete.vue | 4 +++- .../app/common/views/components/messaging.vue | 6 +++-- .../views/components/welcome-timeline.vue | 4 +++- .../views/components/followers-window.vue | 11 +++++++-- .../views/components/following-window.vue | 11 +++++++-- .../views/components/friends-maker.vue | 4 +++- .../components/messaging-room-window.vue | 6 ++++- .../views/components/notifications.vue | 16 +++++++------ .../views/components/post-detail.sub.vue | 6 ++++- .../desktop/views/components/post-detail.vue | 11 +++++++-- .../desktop/views/components/post-preview.vue | 6 ++++- .../views/components/posts.post.sub.vue | 6 ++++- .../desktop/views/components/posts.post.vue | 6 ++++- .../views/components/settings.mute.vue | 6 +++-- .../views/components/settings.profile.vue | 4 ++-- .../desktop/views/components/ui.header.vue | 9 +++++++- .../views/components/users-list.item.vue | 6 ++++- .../desktop/views/pages/messaging-room.vue | 3 ++- .../pages/user/user.followers-you-know.vue | 6 +++-- .../desktop/views/pages/user/user.header.vue | 6 ++++- .../app/desktop/views/pages/user/user.vue | 3 ++- .../views/widgets/channel.channel.post.vue | 6 ++++- .../app/desktop/views/widgets/profile.vue | 9 +++++++- .../app/desktop/views/widgets/users.vue | 4 +++- .../views/components/notification-preview.vue | 23 +++++++++++++------ .../mobile/views/components/notification.vue | 15 ++++++++---- .../app/mobile/views/components/post-card.vue | 6 ++++- .../views/components/post-detail.sub.vue | 6 ++++- .../mobile/views/components/post-detail.vue | 11 +++++++-- .../mobile/views/components/post-preview.vue | 6 ++++- .../app/mobile/views/components/post.sub.vue | 6 ++++- .../app/mobile/views/components/post.vue | 11 +++++++-- .../app/mobile/views/components/ui.header.vue | 8 ++++++- .../app/mobile/views/components/ui.nav.vue | 8 ++++++- .../app/mobile/views/components/user-card.vue | 6 ++++- .../mobile/views/components/user-preview.vue | 6 ++++- .../app/mobile/views/pages/followers.vue | 10 ++++++-- .../app/mobile/views/pages/following.vue | 10 ++++++-- .../app/mobile/views/pages/messaging-room.vue | 10 ++++++-- .../mobile/views/pages/profile-setting.vue | 4 ++-- .../app/mobile/views/pages/settings.vue | 8 ++++++- src/client/app/mobile/views/pages/user.vue | 11 +++++---- .../pages/user/home.followers-you-know.vue | 8 ++++++- .../app/mobile/views/widgets/profile.vue | 10 +++++++- src/models/user.ts | 6 ++--- src/othello/ai/back.ts | 17 +++++++------- src/renderers/get-notification-summary.ts | 15 ++++++------ src/renderers/get-user-summary.ts | 3 ++- src/server/api/bot/core.ts | 7 +++--- src/server/api/bot/interfaces/line.ts | 5 ++-- 52 files changed, 311 insertions(+), 107 deletions(-) diff --git a/src/client/app/ch/tags/channel.tag b/src/client/app/ch/tags/channel.tag index 1ebc3cceb..4856728de 100644 --- a/src/client/app/ch/tags/channel.tag +++ b/src/client/app/ch/tags/channel.tag @@ -165,7 +165,7 @@
{ post.index }: - { post.user.name } + { getUserName(post.user) } ID:{ acct } @@ -230,10 +230,12 @@ diff --git a/src/client/app/common/scripts/compose-notification.ts b/src/client/app/common/scripts/compose-notification.ts index ebc15952f..e99d50296 100644 --- a/src/client/app/common/scripts/compose-notification.ts +++ b/src/client/app/common/scripts/compose-notification.ts @@ -1,5 +1,6 @@ import getPostSummary from '../../../../renderers/get-post-summary'; import getReactionEmoji from '../../../../renderers/get-reaction-emoji'; +import getUserName from '../../../../renderers/get-user-name'; type Notification = { title: string; @@ -21,35 +22,35 @@ export default function(type, data): Notification { case 'mention': return { - title: `${data.user.name}さんから:`, + title: `${getUserName(data.user)}さんから:`, body: getPostSummary(data), icon: data.user.avatarUrl + '?thumbnail&size=64' }; case 'reply': return { - title: `${data.user.name}さんから返信:`, + title: `${getUserName(data.user)}さんから返信:`, body: getPostSummary(data), icon: data.user.avatarUrl + '?thumbnail&size=64' }; case 'quote': return { - title: `${data.user.name}さんが引用:`, + title: `${getUserName(data.user)}さんが引用:`, body: getPostSummary(data), icon: data.user.avatarUrl + '?thumbnail&size=64' }; case 'reaction': return { - title: `${data.user.name}: ${getReactionEmoji(data.reaction)}:`, + title: `${getUserName(data.user)}: ${getReactionEmoji(data.reaction)}:`, body: getPostSummary(data.post), icon: data.user.avatarUrl + '?thumbnail&size=64' }; case 'unread_messaging_message': return { - title: `${data.user.name}さんからメッセージ:`, + title: `${getUserName(data.user)}さんからメッセージ:`, body: data.text, // TODO: getMessagingMessageSummary(data), icon: data.user.avatarUrl + '?thumbnail&size=64' }; @@ -57,7 +58,7 @@ export default function(type, data): Notification { case 'othello_invited': return { title: '対局への招待があります', - body: `${data.parent.name}さんから`, + body: `${getUserName(data.parent)}さんから`, icon: data.parent.avatarUrl + '?thumbnail&size=64' }; diff --git a/src/client/app/common/views/components/autocomplete.vue b/src/client/app/common/views/components/autocomplete.vue index 38eaf8650..8837fde6b 100644 --- a/src/client/app/common/views/components/autocomplete.vue +++ b/src/client/app/common/views/components/autocomplete.vue @@ -3,7 +3,7 @@
  1. - {{ user.name }} + {{ getUserName(user) }} @{{ getAcct(user) }}
@@ -22,6 +22,7 @@ import Vue from 'vue'; import * as emojilib from 'emojilib'; import contains from '../../../common/scripts/contains'; import getAcct from '../../../../../acct/render'; +import getUserName from '../../../../../renderers/get-user-name'; const lib = Object.entries(emojilib.lib).filter((x: any) => { return x[1].category != 'flags'; @@ -107,6 +108,7 @@ export default Vue.extend({ }, methods: { getAcct, + getUserName, exec() { this.select = -1; if (this.$refs.suggests) { diff --git a/src/client/app/common/views/components/messaging.vue b/src/client/app/common/views/components/messaging.vue index 4ab3e46e8..9b1449daa 100644 --- a/src/client/app/common/views/components/messaging.vue +++ b/src/client/app/common/views/components/messaging.vue @@ -14,7 +14,7 @@ tabindex="-1" > - {{ user.name }} + {{ getUserName(user) }} @{{ getAcct(user) }} @@ -33,7 +33,7 @@
- {{ isMe(message) ? message.recipient.name : message.user.name }} + {{ getUserName(isMe(message) ? message.recipient : message.user) }} @{{ getAcct(isMe(message) ? message.recipient : message.user) }}
@@ -52,6 +52,7 @@ diff --git a/src/client/app/desktop/views/components/following-window.vue b/src/client/app/desktop/views/components/following-window.vue index 612847b38..cbd8ec5f9 100644 --- a/src/client/app/desktop/views/components/following-window.vue +++ b/src/client/app/desktop/views/components/following-window.vue @@ -1,7 +1,7 @@