wip
This commit is contained in:
		
							parent
							
								
									b60121527d
								
							
						
					
					
						commit
						e8b42d7e16
					
				
					 20 changed files with 354 additions and 377 deletions
				
			
		
							
								
								
									
										37
									
								
								src/queue/index.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										37
									
								
								src/queue/index.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,37 @@
 | 
			
		|||
import { createQueue } from 'kue';
 | 
			
		||||
import config from '../config';
 | 
			
		||||
import db from './processors/db';
 | 
			
		||||
import http from './processors/http';
 | 
			
		||||
 | 
			
		||||
const queue = createQueue({
 | 
			
		||||
	redis: {
 | 
			
		||||
		port: config.redis.port,
 | 
			
		||||
		host: config.redis.host,
 | 
			
		||||
		auth: config.redis.pass
 | 
			
		||||
	}
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
export function createHttp(data) {
 | 
			
		||||
	return queue
 | 
			
		||||
		.create('http', data)
 | 
			
		||||
		.attempts(16)
 | 
			
		||||
		.backoff({ delay: 16384, type: 'exponential' });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export function createDb(data) {
 | 
			
		||||
	return queue.create('db', data);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export function process() {
 | 
			
		||||
	queue.process('db', db);
 | 
			
		||||
 | 
			
		||||
	/*
 | 
			
		||||
		256 is the default concurrency limit of Mozilla Firefox and Google
 | 
			
		||||
		Chromium.
 | 
			
		||||
		a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google
 | 
			
		||||
		https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff
 | 
			
		||||
		Network.http.max-connections - MozillaZine Knowledge Base
 | 
			
		||||
		http://kb.mozillazine.org/Network.http.max-connections
 | 
			
		||||
	*/
 | 
			
		||||
	queue.process('http', 256, http);
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										22
									
								
								src/queue/processors/db/delete-post-dependents.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										22
									
								
								src/queue/processors/db/delete-post-dependents.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,22 @@
 | 
			
		|||
import Favorite from '../../models/favorite';
 | 
			
		||||
import Notification from '../../models/notification';
 | 
			
		||||
import PollVote from '../../models/poll-vote';
 | 
			
		||||
import PostReaction from '../../models/post-reaction';
 | 
			
		||||
import PostWatching from '../../models/post-watching';
 | 
			
		||||
import Post from '../../models/post';
 | 
			
		||||
 | 
			
		||||
export default async ({ data }) => Promise.all([
 | 
			
		||||
	Favorite.remove({ postId: data._id }),
 | 
			
		||||
	Notification.remove({ postId: data._id }),
 | 
			
		||||
	PollVote.remove({ postId: data._id }),
 | 
			
		||||
	PostReaction.remove({ postId: data._id }),
 | 
			
		||||
	PostWatching.remove({ postId: data._id }),
 | 
			
		||||
	Post.find({ repostId: data._id }).then(reposts => Promise.all([
 | 
			
		||||
		Notification.remove({
 | 
			
		||||
			postId: {
 | 
			
		||||
				$in: reposts.map(({ _id }) => _id)
 | 
			
		||||
			}
 | 
			
		||||
		}),
 | 
			
		||||
		Post.remove({ repostId: data._id })
 | 
			
		||||
	]))
 | 
			
		||||
]);
 | 
			
		||||
							
								
								
									
										7
									
								
								src/queue/processors/db/index.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								src/queue/processors/db/index.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,7 @@
 | 
			
		|||
import deletePostDependents from './delete-post-dependents';
 | 
			
		||||
 | 
			
		||||
const handlers = {
 | 
			
		||||
  deletePostDependents
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
export default (job, done) => handlers[job.data.type](job).then(() => done(), done);
 | 
			
		||||
							
								
								
									
										17
									
								
								src/queue/processors/http/deliver.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								src/queue/processors/http/deliver.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,17 @@
 | 
			
		|||
import * as kue from 'kue';
 | 
			
		||||
 | 
			
		||||
import Channel from '../../models/channel';
 | 
			
		||||
import Following from '../../models/following';
 | 
			
		||||
import ChannelWatching from '../../models/channel-watching';
 | 
			
		||||
import Post, { pack } from '../../models/post';
 | 
			
		||||
import User, { isLocalUser } from '../../models/user';
 | 
			
		||||
import stream, { publishChannelStream } from '../../publishers/stream';
 | 
			
		||||
import context from '../../remote/activitypub/renderer/context';
 | 
			
		||||
import renderCreate from '../../remote/activitypub/renderer/create';
 | 
			
		||||
import renderNote from '../../remote/activitypub/renderer/note';
 | 
			
		||||
import request from '../../remote/request';
 | 
			
		||||
 | 
			
		||||
export default async (job: kue.Job, done): Promise<void> => {
 | 
			
		||||
 | 
			
		||||
	request(user, following.follower[0].account.inbox, create);
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										69
									
								
								src/queue/processors/http/follow.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								src/queue/processors/http/follow.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,69 @@
 | 
			
		|||
import User, { isLocalUser, pack as packUser } from '../../models/user';
 | 
			
		||||
import Following from '../../models/following';
 | 
			
		||||
import FollowingLog from '../../models/following-log';
 | 
			
		||||
import FollowedLog from '../../models/followed-log';
 | 
			
		||||
import event from '../../publishers/stream';
 | 
			
		||||
import notify from '../../publishers/notify';
 | 
			
		||||
import context from '../../remote/activitypub/renderer/context';
 | 
			
		||||
import render from '../../remote/activitypub/renderer/follow';
 | 
			
		||||
import request from '../../remote/request';
 | 
			
		||||
 | 
			
		||||
export default ({ data }) => Following.findOne({ _id: data.following }).then(({ followerId, followeeId }) => {
 | 
			
		||||
	const promisedFollower = User.findOne({ _id: followerId });
 | 
			
		||||
	const promisedFollowee = User.findOne({ _id: followeeId });
 | 
			
		||||
 | 
			
		||||
	return Promise.all([
 | 
			
		||||
		// Increment following count
 | 
			
		||||
		User.update(followerId, {
 | 
			
		||||
			$inc: {
 | 
			
		||||
				followingCount: 1
 | 
			
		||||
			}
 | 
			
		||||
		}),
 | 
			
		||||
 | 
			
		||||
		promisedFollower.then(({ followingCount }) => FollowingLog.insert({
 | 
			
		||||
			createdAt: data.following.createdAt,
 | 
			
		||||
			userId: followerId,
 | 
			
		||||
			count: followingCount + 1
 | 
			
		||||
		})),
 | 
			
		||||
 | 
			
		||||
		// Increment followers count
 | 
			
		||||
		User.update({ _id: followeeId }, {
 | 
			
		||||
			$inc: {
 | 
			
		||||
				followersCount: 1
 | 
			
		||||
			}
 | 
			
		||||
		}),
 | 
			
		||||
 | 
			
		||||
		promisedFollowee.then(({ followersCount }) => FollowedLog.insert({
 | 
			
		||||
			createdAt: data.following.createdAt,
 | 
			
		||||
			userId: followerId,
 | 
			
		||||
			count: followersCount + 1
 | 
			
		||||
		})),
 | 
			
		||||
 | 
			
		||||
		// Notify
 | 
			
		||||
		promisedFollowee.then(followee => followee.host === null ?
 | 
			
		||||
			notify(followeeId, followerId, 'follow') : null),
 | 
			
		||||
 | 
			
		||||
		// Publish follow event
 | 
			
		||||
		Promise.all([promisedFollower, promisedFollowee]).then(([follower, followee]) => {
 | 
			
		||||
			let followerEvent;
 | 
			
		||||
			let followeeEvent;
 | 
			
		||||
 | 
			
		||||
			if (isLocalUser(follower)) {
 | 
			
		||||
				followerEvent = packUser(followee, follower)
 | 
			
		||||
					.then(packed => event(follower._id, 'follow', packed));
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if (isLocalUser(followee)) {
 | 
			
		||||
				followeeEvent = packUser(follower, followee)
 | 
			
		||||
					.then(packed => event(followee._id, 'followed', packed));
 | 
			
		||||
			} else if (isLocalUser(follower)) {
 | 
			
		||||
				const rendered = render(follower, followee);
 | 
			
		||||
				rendered['@context'] = context;
 | 
			
		||||
 | 
			
		||||
				followeeEvent = request(follower, followee.account.inbox, rendered);
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			return Promise.all([followerEvent, followeeEvent]);
 | 
			
		||||
		})
 | 
			
		||||
	]);
 | 
			
		||||
});
 | 
			
		||||
							
								
								
									
										17
									
								
								src/queue/processors/http/index.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								src/queue/processors/http/index.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,17 @@
 | 
			
		|||
import deliverPost from './deliver-post';
 | 
			
		||||
import follow from './follow';
 | 
			
		||||
import performActivityPub from './perform-activitypub';
 | 
			
		||||
import processInbox from './process-inbox';
 | 
			
		||||
import reportGitHubFailure from './report-github-failure';
 | 
			
		||||
import unfollow from './unfollow';
 | 
			
		||||
 | 
			
		||||
const handlers = {
 | 
			
		||||
  deliverPost,
 | 
			
		||||
  follow,
 | 
			
		||||
  performActivityPub,
 | 
			
		||||
  processInbox,
 | 
			
		||||
  reportGitHubFailure,
 | 
			
		||||
  unfollow
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
export default (job, done) => handlers[job.data.type](job).then(() => done(), done);
 | 
			
		||||
							
								
								
									
										7
									
								
								src/queue/processors/http/perform-activitypub.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								src/queue/processors/http/perform-activitypub.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,7 @@
 | 
			
		|||
import User from '../../models/user';
 | 
			
		||||
import act from '../../remote/activitypub/act';
 | 
			
		||||
import Resolver from '../../remote/activitypub/resolver';
 | 
			
		||||
 | 
			
		||||
export default ({ data }) => User.findOne({ _id: data.actor })
 | 
			
		||||
	.then(actor => act(new Resolver(), actor, data.outbox))
 | 
			
		||||
	.then(Promise.all);
 | 
			
		||||
							
								
								
									
										55
									
								
								src/queue/processors/http/process-inbox.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										55
									
								
								src/queue/processors/http/process-inbox.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,55 @@
 | 
			
		|||
import * as kue from 'kue';
 | 
			
		||||
 | 
			
		||||
import { verifySignature } from 'http-signature';
 | 
			
		||||
import parseAcct from '../../acct/parse';
 | 
			
		||||
import User, { IRemoteUser } from '../../models/user';
 | 
			
		||||
import act from '../../remote/activitypub/act';
 | 
			
		||||
import resolvePerson from '../../remote/activitypub/resolve-person';
 | 
			
		||||
import Resolver from '../../remote/activitypub/resolver';
 | 
			
		||||
 | 
			
		||||
// ユーザーのinboxにアクティビティが届いた時の処理
 | 
			
		||||
export default async (job: kue.Job, done): Promise<void> => {
 | 
			
		||||
	const signature = job.data.signature;
 | 
			
		||||
	const activity = job.data.activity;
 | 
			
		||||
 | 
			
		||||
	const keyIdLower = signature.keyId.toLowerCase();
 | 
			
		||||
	let user;
 | 
			
		||||
 | 
			
		||||
	if (keyIdLower.startsWith('acct:')) {
 | 
			
		||||
		const { username, host } = parseAcct(keyIdLower.slice('acct:'.length));
 | 
			
		||||
		if (host === null) {
 | 
			
		||||
			console.warn(`request was made by local user: @${username}`);
 | 
			
		||||
			done();
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser;
 | 
			
		||||
	} else {
 | 
			
		||||
		user = await User.findOne({
 | 
			
		||||
			host: { $ne: null },
 | 
			
		||||
			'account.publicKey.id': signature.keyId
 | 
			
		||||
		}) as IRemoteUser;
 | 
			
		||||
 | 
			
		||||
		// アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する
 | 
			
		||||
		if (user === null) {
 | 
			
		||||
			user = await resolvePerson(signature.keyId);
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if (user === null) {
 | 
			
		||||
		done(new Error('failed to resolve user'));
 | 
			
		||||
		return;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if (!verifySignature(signature, user.account.publicKey.publicKeyPem)) {
 | 
			
		||||
		done(new Error('signature verification failed'));
 | 
			
		||||
		return;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// アクティビティを処理
 | 
			
		||||
	try {
 | 
			
		||||
		await act(new Resolver(), user, activity);
 | 
			
		||||
		done();
 | 
			
		||||
	} catch (e) {
 | 
			
		||||
		done(e);
 | 
			
		||||
	}
 | 
			
		||||
};
 | 
			
		||||
							
								
								
									
										24
									
								
								src/queue/processors/http/report-github-failure.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								src/queue/processors/http/report-github-failure.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,24 @@
 | 
			
		|||
import * as request from 'request-promise-native';
 | 
			
		||||
import User from '../../models/user';
 | 
			
		||||
const createPost = require('../../server/api/endpoints/posts/create');
 | 
			
		||||
 | 
			
		||||
export default async ({ data }) => {
 | 
			
		||||
	const asyncBot = User.findOne({ _id: data.userId });
 | 
			
		||||
 | 
			
		||||
	// Fetch parent status
 | 
			
		||||
	const parentStatuses = await request({
 | 
			
		||||
		url: `${data.parentUrl}/statuses`,
 | 
			
		||||
		headers: {
 | 
			
		||||
			'User-Agent': 'misskey'
 | 
			
		||||
		},
 | 
			
		||||
		json: true
 | 
			
		||||
	});
 | 
			
		||||
 | 
			
		||||
	const parentState = parentStatuses[0].state;
 | 
			
		||||
	const stillFailed = parentState == 'failure' || parentState == 'error';
 | 
			
		||||
	const text = stillFailed ?
 | 
			
		||||
		`**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` :
 | 
			
		||||
		`**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`;
 | 
			
		||||
 | 
			
		||||
	createPost({ text }, await asyncBot);
 | 
			
		||||
};
 | 
			
		||||
							
								
								
									
										56
									
								
								src/queue/processors/http/unfollow.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										56
									
								
								src/queue/processors/http/unfollow.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,56 @@
 | 
			
		|||
import FollowedLog from '../../models/followed-log';
 | 
			
		||||
import Following from '../../models/following';
 | 
			
		||||
import FollowingLog from '../../models/following-log';
 | 
			
		||||
import User, { isRemoteUser, pack as packUser } from '../../models/user';
 | 
			
		||||
import stream from '../../publishers/stream';
 | 
			
		||||
import renderFollow from '../../remote/activitypub/renderer/follow';
 | 
			
		||||
import renderUndo from '../../remote/activitypub/renderer/undo';
 | 
			
		||||
import context from '../../remote/activitypub/renderer/context';
 | 
			
		||||
import request from '../../remote/request';
 | 
			
		||||
 | 
			
		||||
export default async ({ data }) => {
 | 
			
		||||
	// Delete following
 | 
			
		||||
	const following = await Following.findOneAndDelete({ _id: data.id });
 | 
			
		||||
	if (following === null) {
 | 
			
		||||
		return;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	const promisedFollower = User.findOne({ _id: following.followerId });
 | 
			
		||||
	const promisedFollowee = User.findOne({ _id: following.followeeId });
 | 
			
		||||
 | 
			
		||||
	await Promise.all([
 | 
			
		||||
		// Decrement following count
 | 
			
		||||
		User.update({ _id: following.followerId }, { $inc: { followingCount: -1 } }),
 | 
			
		||||
		promisedFollower.then(({ followingCount }) => FollowingLog.insert({
 | 
			
		||||
			createdAt: new Date(),
 | 
			
		||||
			userId: following.followerId,
 | 
			
		||||
			count: followingCount - 1
 | 
			
		||||
		})),
 | 
			
		||||
 | 
			
		||||
		// Decrement followers count
 | 
			
		||||
		User.update({ _id: following.followeeId }, { $inc: { followersCount: -1 } }),
 | 
			
		||||
		promisedFollowee.then(({ followersCount }) => FollowedLog.insert({
 | 
			
		||||
			createdAt: new Date(),
 | 
			
		||||
			userId: following.followeeId,
 | 
			
		||||
			count: followersCount - 1
 | 
			
		||||
		})),
 | 
			
		||||
 | 
			
		||||
		// Publish follow event
 | 
			
		||||
		Promise.all([promisedFollower, promisedFollowee]).then(async ([follower, followee]) => {
 | 
			
		||||
			if (isRemoteUser(follower)) {
 | 
			
		||||
				return;
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			const promisedPackedUser = packUser(followee, follower);
 | 
			
		||||
 | 
			
		||||
			if (isRemoteUser(followee)) {
 | 
			
		||||
				const undo = renderUndo(renderFollow(follower, followee));
 | 
			
		||||
				undo['@context'] = context;
 | 
			
		||||
 | 
			
		||||
				await request(follower, followee.account.inbox, undo);
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			stream(follower._id, 'unfollow', promisedPackedUser);
 | 
			
		||||
		})
 | 
			
		||||
	]);
 | 
			
		||||
};
 | 
			
		||||
							
								
								
									
										18
									
								
								src/queue/processors/index.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								src/queue/processors/index.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,18 @@
 | 
			
		|||
import queue from '../queue';
 | 
			
		||||
import db from './db';
 | 
			
		||||
import http from './http';
 | 
			
		||||
 | 
			
		||||
export default () => {
 | 
			
		||||
	queue.process('db', db);
 | 
			
		||||
 | 
			
		||||
	/*
 | 
			
		||||
		256 is the default concurrency limit of Mozilla Firefox and Google
 | 
			
		||||
		Chromium.
 | 
			
		||||
 | 
			
		||||
		a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google
 | 
			
		||||
		https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff
 | 
			
		||||
		Network.http.max-connections - MozillaZine Knowledge Base
 | 
			
		||||
		http://kb.mozillazine.org/Network.http.max-connections
 | 
			
		||||
	*/
 | 
			
		||||
	queue.process('http', 256, http);
 | 
			
		||||
};
 | 
			
		||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue