Extract http request from post delivery job
This commit is contained in:
		
							parent
							
								
									783453897f
								
							
						
					
					
						commit
						df38c2f485
					
				
					 2 changed files with 104 additions and 92 deletions
				
			
		| 
						 | 
					@ -1,8 +1,11 @@
 | 
				
			||||||
 | 
					import Channel from '../models/channel';
 | 
				
			||||||
 | 
					import ChannelWatching from '../models/channel-watching';
 | 
				
			||||||
 | 
					import Following from '../models/following';
 | 
				
			||||||
import Mute from '../models/mute';
 | 
					import Mute from '../models/mute';
 | 
				
			||||||
import Post, { pack } from '../models/post';
 | 
					import Post, { pack } from '../models/post';
 | 
				
			||||||
import Watching from '../models/post-watching';
 | 
					import Watching from '../models/post-watching';
 | 
				
			||||||
import User from '../models/user';
 | 
					import User, { isLocalUser } from '../models/user';
 | 
				
			||||||
import stream from '../publishers/stream';
 | 
					import stream, { publishChannelStream } from '../publishers/stream';
 | 
				
			||||||
import notify from '../publishers/notify';
 | 
					import notify from '../publishers/notify';
 | 
				
			||||||
import pushSw from '../publishers/push-sw';
 | 
					import pushSw from '../publishers/push-sw';
 | 
				
			||||||
import queue from '../queue';
 | 
					import queue from '../queue';
 | 
				
			||||||
| 
						 | 
					@ -21,10 +24,6 @@ export default async (user, mentions, post) => {
 | 
				
			||||||
				latestPost: post._id
 | 
									latestPost: post._id
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}),
 | 
							}),
 | 
				
			||||||
		new Promise((resolve, reject) => queue.create('http', {
 | 
					 | 
				
			||||||
			type: 'deliverPost',
 | 
					 | 
				
			||||||
			id: post._id,
 | 
					 | 
				
			||||||
		}).save(error => error ? reject(error) : resolve())),
 | 
					 | 
				
			||||||
	] as Array<Promise<any>>;
 | 
						] as Array<Promise<any>>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	function addMention(promisedMentionee, reason) {
 | 
						function addMention(promisedMentionee, reason) {
 | 
				
			||||||
| 
						 | 
					@ -50,6 +49,91 @@ export default async (user, mentions, post) => {
 | 
				
			||||||
		}));
 | 
							}));
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// タイムラインへの投稿
 | 
				
			||||||
 | 
						if (!post.channelId) {
 | 
				
			||||||
 | 
							promises.push(
 | 
				
			||||||
 | 
								// Publish event to myself's stream
 | 
				
			||||||
 | 
								promisedPostObj.then(postObj => {
 | 
				
			||||||
 | 
									stream(post.userId, 'post', postObj);
 | 
				
			||||||
 | 
								}),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								Promise.all([
 | 
				
			||||||
 | 
									User.findOne({ _id: post.userId }),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									// Fetch all followers
 | 
				
			||||||
 | 
									Following.aggregate([{
 | 
				
			||||||
 | 
										$lookup: {
 | 
				
			||||||
 | 
											from: 'users',
 | 
				
			||||||
 | 
											localField: 'followerId',
 | 
				
			||||||
 | 
											foreignField: '_id',
 | 
				
			||||||
 | 
											as: 'follower'
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}, {
 | 
				
			||||||
 | 
										$match: {
 | 
				
			||||||
 | 
											followeeId: post.userId
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}], {
 | 
				
			||||||
 | 
										_id: false
 | 
				
			||||||
 | 
									})
 | 
				
			||||||
 | 
								]).then(([user, followers]) => Promise.all(followers.map(following => {
 | 
				
			||||||
 | 
									if (isLocalUser(following.follower)) {
 | 
				
			||||||
 | 
										// Publish event to followers stream
 | 
				
			||||||
 | 
										return promisedPostObj.then(postObj => {
 | 
				
			||||||
 | 
											stream(following.followerId, 'post', postObj);
 | 
				
			||||||
 | 
										});
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									return new Promise((resolve, reject) => {
 | 
				
			||||||
 | 
										queue.create('http', {
 | 
				
			||||||
 | 
											type: 'deliverPost',
 | 
				
			||||||
 | 
											fromId: user._id,
 | 
				
			||||||
 | 
											toId: following.followerId,
 | 
				
			||||||
 | 
											postId: post._id
 | 
				
			||||||
 | 
										}).save(error => {
 | 
				
			||||||
 | 
											if (error) {
 | 
				
			||||||
 | 
												reject(error);
 | 
				
			||||||
 | 
											} else {
 | 
				
			||||||
 | 
												resolve();
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
 | 
										});
 | 
				
			||||||
 | 
									});
 | 
				
			||||||
 | 
								})))
 | 
				
			||||||
 | 
							);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// チャンネルへの投稿
 | 
				
			||||||
 | 
						if (post.channelId) {
 | 
				
			||||||
 | 
							promises.push(
 | 
				
			||||||
 | 
								// Increment channel index(posts count)
 | 
				
			||||||
 | 
								Channel.update({ _id: post.channelId }, {
 | 
				
			||||||
 | 
									$inc: {
 | 
				
			||||||
 | 
										index: 1
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// Publish event to channel
 | 
				
			||||||
 | 
								promisedPostObj.then(postObj => {
 | 
				
			||||||
 | 
									publishChannelStream(post.channelId, 'post', postObj);
 | 
				
			||||||
 | 
								}),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								Promise.all([
 | 
				
			||||||
 | 
									promisedPostObj,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									// Get channel watchers
 | 
				
			||||||
 | 
									ChannelWatching.find({
 | 
				
			||||||
 | 
										channelId: post.channelId,
 | 
				
			||||||
 | 
										// 削除されたドキュメントは除く
 | 
				
			||||||
 | 
										deletedAt: { $exists: false }
 | 
				
			||||||
 | 
									})
 | 
				
			||||||
 | 
								]).then(([postObj, watches]) => {
 | 
				
			||||||
 | 
									// チャンネルの視聴者(のタイムライン)に配信
 | 
				
			||||||
 | 
									watches.forEach(w => {
 | 
				
			||||||
 | 
										stream(w.userId, 'post', postObj);
 | 
				
			||||||
 | 
									});
 | 
				
			||||||
 | 
								})
 | 
				
			||||||
 | 
							);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// If has in reply to post
 | 
						// If has in reply to post
 | 
				
			||||||
	if (post.replyId) {
 | 
						if (post.replyId) {
 | 
				
			||||||
		promises.push(
 | 
							promises.push(
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,93 +1,21 @@
 | 
				
			||||||
import Channel from '../../models/channel';
 | 
					import Post from '../../models/post';
 | 
				
			||||||
import Following from '../../models/following';
 | 
					import User, { IRemoteUser } from '../../models/user';
 | 
				
			||||||
import ChannelWatching from '../../models/channel-watching';
 | 
					 | 
				
			||||||
import Post, { pack } from '../../models/post';
 | 
					 | 
				
			||||||
import User, { isLocalUser } from '../../models/user';
 | 
					 | 
				
			||||||
import stream, { publishChannelStream } from '../../publishers/stream';
 | 
					 | 
				
			||||||
import context from '../../remote/activitypub/renderer/context';
 | 
					import context from '../../remote/activitypub/renderer/context';
 | 
				
			||||||
import renderCreate from '../../remote/activitypub/renderer/create';
 | 
					import renderCreate from '../../remote/activitypub/renderer/create';
 | 
				
			||||||
import renderNote from '../../remote/activitypub/renderer/note';
 | 
					import renderNote from '../../remote/activitypub/renderer/note';
 | 
				
			||||||
import request from '../../remote/request';
 | 
					import request from '../../remote/request';
 | 
				
			||||||
 | 
					
 | 
				
			||||||
export default ({ data }) => Post.findOne({ _id: data.id }).then(post => {
 | 
					export default async ({ data }) => {
 | 
				
			||||||
	const promisedPostObj = pack(post);
 | 
						const promisedTo = User.findOne({ _id: data.toId }) as Promise<IRemoteUser>;
 | 
				
			||||||
	const promises = [];
 | 
						const [from, post] = await Promise.all([
 | 
				
			||||||
 | 
							User.findOne({ _id: data.fromId }),
 | 
				
			||||||
	// タイムラインへの投稿
 | 
							Post.findOne({ _id: data.postId })
 | 
				
			||||||
	if (!post.channelId) {
 | 
						]);
 | 
				
			||||||
		promises.push(
 | 
						const note = await renderNote(from, post);
 | 
				
			||||||
			// Publish event to myself's stream
 | 
						const to = await promisedTo;
 | 
				
			||||||
			promisedPostObj.then(postObj => {
 | 
					 | 
				
			||||||
				stream(post.userId, 'post', postObj);
 | 
					 | 
				
			||||||
			}),
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			Promise.all([
 | 
					 | 
				
			||||||
				User.findOne({ _id: post.userId }),
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				// Fetch all followers
 | 
					 | 
				
			||||||
				Following.aggregate([{
 | 
					 | 
				
			||||||
					$lookup: {
 | 
					 | 
				
			||||||
						from: 'users',
 | 
					 | 
				
			||||||
						localField: 'followerId',
 | 
					 | 
				
			||||||
						foreignField: '_id',
 | 
					 | 
				
			||||||
						as: 'follower'
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
				}, {
 | 
					 | 
				
			||||||
					$match: {
 | 
					 | 
				
			||||||
						followeeId: post.userId
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
				}], {
 | 
					 | 
				
			||||||
					_id: false
 | 
					 | 
				
			||||||
				})
 | 
					 | 
				
			||||||
			]).then(([user, followers]) => Promise.all(followers.map(following => {
 | 
					 | 
				
			||||||
				if (isLocalUser(following.follower)) {
 | 
					 | 
				
			||||||
					// Publish event to followers stream
 | 
					 | 
				
			||||||
					return promisedPostObj.then(postObj => {
 | 
					 | 
				
			||||||
						stream(following.followerId, 'post', postObj);
 | 
					 | 
				
			||||||
					});
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				return renderNote(user, post).then(note => {
 | 
					 | 
				
			||||||
	const create = renderCreate(note);
 | 
						const create = renderCreate(note);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	create['@context'] = context;
 | 
						create['@context'] = context;
 | 
				
			||||||
					return request(user, following.follower[0].account.inbox, create);
 | 
					 | 
				
			||||||
				});
 | 
					 | 
				
			||||||
			})))
 | 
					 | 
				
			||||||
		);
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// チャンネルへの投稿
 | 
						return request(from, to.account.inbox, create);
 | 
				
			||||||
	if (post.channelId) {
 | 
					};
 | 
				
			||||||
		promises.push(
 | 
					 | 
				
			||||||
			// Increment channel index(posts count)
 | 
					 | 
				
			||||||
			Channel.update({ _id: post.channelId }, {
 | 
					 | 
				
			||||||
				$inc: {
 | 
					 | 
				
			||||||
					index: 1
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}),
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			// Publish event to channel
 | 
					 | 
				
			||||||
			promisedPostObj.then(postObj => {
 | 
					 | 
				
			||||||
				publishChannelStream(post.channelId, 'post', postObj);
 | 
					 | 
				
			||||||
			}),
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			Promise.all([
 | 
					 | 
				
			||||||
				promisedPostObj,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				// Get channel watchers
 | 
					 | 
				
			||||||
				ChannelWatching.find({
 | 
					 | 
				
			||||||
					channelId: post.channelId,
 | 
					 | 
				
			||||||
					// 削除されたドキュメントは除く
 | 
					 | 
				
			||||||
					deletedAt: { $exists: false }
 | 
					 | 
				
			||||||
				})
 | 
					 | 
				
			||||||
			]).then(([postObj, watches]) => {
 | 
					 | 
				
			||||||
				// チャンネルの視聴者(のタイムライン)に配信
 | 
					 | 
				
			||||||
				watches.forEach(w => {
 | 
					 | 
				
			||||||
					stream(w.userId, 'post', postObj);
 | 
					 | 
				
			||||||
				});
 | 
					 | 
				
			||||||
			})
 | 
					 | 
				
			||||||
		);
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return Promise.all(promises);
 | 
					 | 
				
			||||||
});
 | 
					 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue