Retry HTTP requests
This commit is contained in:
		
							parent
							
								
									86b1345c17
								
							
						
					
					
						commit
						d7c13b975f
					
				
					 24 changed files with 145 additions and 93 deletions
				
			
		
							
								
								
									
										38
									
								
								src/queue/index.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								src/queue/index.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,38 @@
 | 
			
		|||
import { createQueue } from 'kue';
 | 
			
		||||
import config from '../config';
 | 
			
		||||
import db from './processors/db';
 | 
			
		||||
import http from './processors/http';
 | 
			
		||||
 | 
			
		||||
const queue = createQueue({
 | 
			
		||||
	redis: {
 | 
			
		||||
		port: config.redis.port,
 | 
			
		||||
		host: config.redis.host,
 | 
			
		||||
		auth: config.redis.pass
 | 
			
		||||
	}
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
export function createHttp(data) {
 | 
			
		||||
	return queue
 | 
			
		||||
		.create('http', data)
 | 
			
		||||
		.attempts(16)
 | 
			
		||||
		.backoff({ delay: 16384, type: 'exponential' });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export function createDb(data) {
 | 
			
		||||
	return queue.create('db', data);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export function process() {
 | 
			
		||||
	queue.process('db', db);
 | 
			
		||||
 | 
			
		||||
	/*
 | 
			
		||||
		256 is the default concurrency limit of Mozilla Firefox and Google
 | 
			
		||||
		Chromium.
 | 
			
		||||
 | 
			
		||||
		a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google
 | 
			
		||||
		https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff
 | 
			
		||||
		Network.http.max-connections - MozillaZine Knowledge Base
 | 
			
		||||
		http://kb.mozillazine.org/Network.http.max-connections
 | 
			
		||||
	*/
 | 
			
		||||
	queue.process('http', 256, http);
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										22
									
								
								src/queue/processors/db/delete-post-dependents.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										22
									
								
								src/queue/processors/db/delete-post-dependents.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,22 @@
 | 
			
		|||
import Favorite from '../../../models/favorite';
 | 
			
		||||
import Notification from '../../../models/notification';
 | 
			
		||||
import PollVote from '../../../models/poll-vote';
 | 
			
		||||
import PostReaction from '../../../models/post-reaction';
 | 
			
		||||
import PostWatching from '../../../models/post-watching';
 | 
			
		||||
import Post from '../../../models/post';
 | 
			
		||||
 | 
			
		||||
export default async ({ data }) => Promise.all([
 | 
			
		||||
	Favorite.remove({ postId: data._id }),
 | 
			
		||||
	Notification.remove({ postId: data._id }),
 | 
			
		||||
	PollVote.remove({ postId: data._id }),
 | 
			
		||||
	PostReaction.remove({ postId: data._id }),
 | 
			
		||||
	PostWatching.remove({ postId: data._id }),
 | 
			
		||||
	Post.find({ repostId: data._id }).then(reposts => Promise.all([
 | 
			
		||||
		Notification.remove({
 | 
			
		||||
			postId: {
 | 
			
		||||
				$in: reposts.map(({ _id }) => _id)
 | 
			
		||||
			}
 | 
			
		||||
		}),
 | 
			
		||||
		Post.remove({ repostId: data._id })
 | 
			
		||||
	]))
 | 
			
		||||
]);
 | 
			
		||||
							
								
								
									
										7
									
								
								src/queue/processors/db/index.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								src/queue/processors/db/index.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,7 @@
 | 
			
		|||
import deletePostDependents from './delete-post-dependents';
 | 
			
		||||
 | 
			
		||||
const handlers = {
 | 
			
		||||
  deletePostDependents
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
export default (job, done) => handlers[job.data.type](job).then(() => done(), done);
 | 
			
		||||
							
								
								
									
										21
									
								
								src/queue/processors/http/deliver-post.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								src/queue/processors/http/deliver-post.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,21 @@
 | 
			
		|||
import Post from '../../../models/post';
 | 
			
		||||
import User, { IRemoteUser } from '../../../models/user';
 | 
			
		||||
import context from '../../../remote/activitypub/renderer/context';
 | 
			
		||||
import renderCreate from '../../../remote/activitypub/renderer/create';
 | 
			
		||||
import renderNote from '../../../remote/activitypub/renderer/note';
 | 
			
		||||
import request from '../../../remote/request';
 | 
			
		||||
 | 
			
		||||
export default async ({ data }) => {
 | 
			
		||||
	const promisedTo = User.findOne({ _id: data.toId }) as Promise<IRemoteUser>;
 | 
			
		||||
	const [from, post] = await Promise.all([
 | 
			
		||||
		User.findOne({ _id: data.fromId }),
 | 
			
		||||
		Post.findOne({ _id: data.postId })
 | 
			
		||||
	]);
 | 
			
		||||
	const note = await renderNote(from, post);
 | 
			
		||||
	const to = await promisedTo;
 | 
			
		||||
	const create = renderCreate(note);
 | 
			
		||||
 | 
			
		||||
	create['@context'] = context;
 | 
			
		||||
 | 
			
		||||
	return request(from, to.account.inbox, create);
 | 
			
		||||
};
 | 
			
		||||
							
								
								
									
										69
									
								
								src/queue/processors/http/follow.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								src/queue/processors/http/follow.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,69 @@
 | 
			
		|||
import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user';
 | 
			
		||||
import Following from '../../../models/following';
 | 
			
		||||
import FollowingLog from '../../../models/following-log';
 | 
			
		||||
import FollowedLog from '../../../models/followed-log';
 | 
			
		||||
import event from '../../../publishers/stream';
 | 
			
		||||
import notify from '../../../publishers/notify';
 | 
			
		||||
import context from '../../../remote/activitypub/renderer/context';
 | 
			
		||||
import render from '../../../remote/activitypub/renderer/follow';
 | 
			
		||||
import request from '../../../remote/request';
 | 
			
		||||
import Logger from '../../../utils/logger';
 | 
			
		||||
 | 
			
		||||
export default async ({ data }) => {
 | 
			
		||||
	const { followerId, followeeId } = await Following.findOne({ _id: data.following });
 | 
			
		||||
	const [follower, followee] = await Promise.all([
 | 
			
		||||
		User.findOne({ _id: followerId }),
 | 
			
		||||
		User.findOne({ _id: followeeId })
 | 
			
		||||
	]);
 | 
			
		||||
 | 
			
		||||
	if (isLocalUser(follower) && isRemoteUser(followee)) {
 | 
			
		||||
		const rendered = render(follower, followee);
 | 
			
		||||
		rendered['@context'] = context;
 | 
			
		||||
 | 
			
		||||
		await request(follower, followee.account.inbox, rendered);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	try {
 | 
			
		||||
		await Promise.all([
 | 
			
		||||
			// Increment following count
 | 
			
		||||
			User.update(followerId, {
 | 
			
		||||
				$inc: {
 | 
			
		||||
					followingCount: 1
 | 
			
		||||
				}
 | 
			
		||||
			}),
 | 
			
		||||
 | 
			
		||||
			FollowingLog.insert({
 | 
			
		||||
				createdAt: data.following.createdAt,
 | 
			
		||||
				userId: followerId,
 | 
			
		||||
				count: follower.followingCount + 1
 | 
			
		||||
			}),
 | 
			
		||||
 | 
			
		||||
			// Increment followers count
 | 
			
		||||
			User.update({ _id: followeeId }, {
 | 
			
		||||
				$inc: {
 | 
			
		||||
					followersCount: 1
 | 
			
		||||
				}
 | 
			
		||||
			}),
 | 
			
		||||
 | 
			
		||||
			FollowedLog.insert({
 | 
			
		||||
				createdAt: data.following.createdAt,
 | 
			
		||||
				userId: followerId,
 | 
			
		||||
				count: followee.followersCount + 1
 | 
			
		||||
			}),
 | 
			
		||||
 | 
			
		||||
			// Publish follow event
 | 
			
		||||
			isLocalUser(follower) && packUser(followee, follower)
 | 
			
		||||
				.then(packed => event(follower._id, 'follow', packed)),
 | 
			
		||||
 | 
			
		||||
			isLocalUser(followee) && Promise.all([
 | 
			
		||||
				packUser(follower, followee)
 | 
			
		||||
					.then(packed => event(followee._id, 'followed', packed)),
 | 
			
		||||
 | 
			
		||||
				// Notify
 | 
			
		||||
				isLocalUser(followee) && notify(followeeId, followerId, 'follow')
 | 
			
		||||
			])
 | 
			
		||||
		]);
 | 
			
		||||
	} catch (error) {
 | 
			
		||||
		Logger.error(error.toString());
 | 
			
		||||
	}
 | 
			
		||||
};
 | 
			
		||||
							
								
								
									
										17
									
								
								src/queue/processors/http/index.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								src/queue/processors/http/index.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,17 @@
 | 
			
		|||
import deliverPost from './deliver-post';
 | 
			
		||||
import follow from './follow';
 | 
			
		||||
import performActivityPub from './perform-activitypub';
 | 
			
		||||
import processInbox from './process-inbox';
 | 
			
		||||
import reportGitHubFailure from './report-github-failure';
 | 
			
		||||
import unfollow from './unfollow';
 | 
			
		||||
 | 
			
		||||
const handlers = {
 | 
			
		||||
  deliverPost,
 | 
			
		||||
  follow,
 | 
			
		||||
  performActivityPub,
 | 
			
		||||
  processInbox,
 | 
			
		||||
  reportGitHubFailure,
 | 
			
		||||
  unfollow
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
export default (job, done) => handlers[job.data.type](job).then(() => done(), done);
 | 
			
		||||
							
								
								
									
										7
									
								
								src/queue/processors/http/perform-activitypub.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								src/queue/processors/http/perform-activitypub.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,7 @@
 | 
			
		|||
import User from '../../../models/user';
 | 
			
		||||
import act from '../../../remote/activitypub/act';
 | 
			
		||||
import Resolver from '../../../remote/activitypub/resolver';
 | 
			
		||||
 | 
			
		||||
export default ({ data }) => User.findOne({ _id: data.actor })
 | 
			
		||||
	.then(actor => act(new Resolver(), actor, data.outbox))
 | 
			
		||||
	.then(Promise.all);
 | 
			
		||||
							
								
								
									
										39
									
								
								src/queue/processors/http/process-inbox.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										39
									
								
								src/queue/processors/http/process-inbox.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,39 @@
 | 
			
		|||
import { verifySignature } from 'http-signature';
 | 
			
		||||
import parseAcct from '../../../acct/parse';
 | 
			
		||||
import User, { IRemoteUser } from '../../../models/user';
 | 
			
		||||
import act from '../../../remote/activitypub/act';
 | 
			
		||||
import resolvePerson from '../../../remote/activitypub/resolve-person';
 | 
			
		||||
import Resolver from '../../../remote/activitypub/resolver';
 | 
			
		||||
 | 
			
		||||
export default async ({ data }): Promise<void> => {
 | 
			
		||||
	const keyIdLower = data.signature.keyId.toLowerCase();
 | 
			
		||||
	let user;
 | 
			
		||||
 | 
			
		||||
	if (keyIdLower.startsWith('acct:')) {
 | 
			
		||||
		const { username, host } = parseAcct(keyIdLower.slice('acct:'.length));
 | 
			
		||||
		if (host === null) {
 | 
			
		||||
			throw 'request was made by local user';
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser;
 | 
			
		||||
	} else {
 | 
			
		||||
		user = await User.findOne({
 | 
			
		||||
			host: { $ne: null },
 | 
			
		||||
			'account.publicKey.id': data.signature.keyId
 | 
			
		||||
		}) as IRemoteUser;
 | 
			
		||||
 | 
			
		||||
		if (user === null) {
 | 
			
		||||
			user = await resolvePerson(data.signature.keyId);
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if (user === null) {
 | 
			
		||||
		throw 'failed to resolve user';
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if (!verifySignature(data.signature, user.account.publicKey.publicKeyPem)) {
 | 
			
		||||
		throw 'signature verification failed';
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	await Promise.all(await act(new Resolver(), user, data.inbox, true));
 | 
			
		||||
};
 | 
			
		||||
							
								
								
									
										24
									
								
								src/queue/processors/http/report-github-failure.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								src/queue/processors/http/report-github-failure.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,24 @@
 | 
			
		|||
import * as request from 'request-promise-native';
 | 
			
		||||
import User from '../../../models/user';
 | 
			
		||||
const createPost = require('../../../server/api/endpoints/posts/create');
 | 
			
		||||
 | 
			
		||||
export default async ({ data }) => {
 | 
			
		||||
	const asyncBot = User.findOne({ _id: data.userId });
 | 
			
		||||
 | 
			
		||||
	// Fetch parent status
 | 
			
		||||
	const parentStatuses = await request({
 | 
			
		||||
		url: `${data.parentUrl}/statuses`,
 | 
			
		||||
		headers: {
 | 
			
		||||
			'User-Agent': 'misskey'
 | 
			
		||||
		},
 | 
			
		||||
		json: true
 | 
			
		||||
	});
 | 
			
		||||
 | 
			
		||||
	const parentState = parentStatuses[0].state;
 | 
			
		||||
	const stillFailed = parentState == 'failure' || parentState == 'error';
 | 
			
		||||
	const text = stillFailed ?
 | 
			
		||||
		`**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` :
 | 
			
		||||
		`**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`;
 | 
			
		||||
 | 
			
		||||
	createPost({ text }, await asyncBot);
 | 
			
		||||
};
 | 
			
		||||
							
								
								
									
										63
									
								
								src/queue/processors/http/unfollow.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								src/queue/processors/http/unfollow.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,63 @@
 | 
			
		|||
import FollowedLog from '../../../models/followed-log';
 | 
			
		||||
import Following from '../../../models/following';
 | 
			
		||||
import FollowingLog from '../../../models/following-log';
 | 
			
		||||
import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user';
 | 
			
		||||
import stream from '../../../publishers/stream';
 | 
			
		||||
import renderFollow from '../../../remote/activitypub/renderer/follow';
 | 
			
		||||
import renderUndo from '../../../remote/activitypub/renderer/undo';
 | 
			
		||||
import context from '../../../remote/activitypub/renderer/context';
 | 
			
		||||
import request from '../../../remote/request';
 | 
			
		||||
import Logger from '../../../utils/logger';
 | 
			
		||||
 | 
			
		||||
export default async ({ data }) => {
 | 
			
		||||
	const following = await Following.findOne({ _id: data.id });
 | 
			
		||||
	if (following === null) {
 | 
			
		||||
		return;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	const [follower, followee] = await Promise.all([
 | 
			
		||||
		User.findOne({ _id: following.followerId }),
 | 
			
		||||
		User.findOne({ _id: following.followeeId })
 | 
			
		||||
	]);
 | 
			
		||||
 | 
			
		||||
	if (isLocalUser(follower) && isRemoteUser(followee)) {
 | 
			
		||||
		const undo = renderUndo(renderFollow(follower, followee));
 | 
			
		||||
		undo['@context'] = context;
 | 
			
		||||
 | 
			
		||||
		await request(follower, followee.account.inbox, undo);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	try {
 | 
			
		||||
		await Promise.all([
 | 
			
		||||
			// Delete following
 | 
			
		||||
			Following.findOneAndDelete({ _id: data.id }),
 | 
			
		||||
 | 
			
		||||
			// Decrement following count
 | 
			
		||||
			User.update({ _id: follower._id }, { $inc: { followingCount: -1 } }),
 | 
			
		||||
			FollowingLog.insert({
 | 
			
		||||
				createdAt: new Date(),
 | 
			
		||||
				userId: follower._id,
 | 
			
		||||
				count: follower.followingCount - 1
 | 
			
		||||
			}),
 | 
			
		||||
 | 
			
		||||
			// Decrement followers count
 | 
			
		||||
			User.update({ _id: followee._id }, { $inc: { followersCount: -1 } }),
 | 
			
		||||
			FollowedLog.insert({
 | 
			
		||||
				createdAt: new Date(),
 | 
			
		||||
				userId: followee._id,
 | 
			
		||||
				count: followee.followersCount - 1
 | 
			
		||||
			})
 | 
			
		||||
		]);
 | 
			
		||||
 | 
			
		||||
		if (isLocalUser(follower)) {
 | 
			
		||||
			return;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		const promisedPackedUser = packUser(followee, follower);
 | 
			
		||||
 | 
			
		||||
		// Publish follow event
 | 
			
		||||
		stream(follower._id, 'unfollow', promisedPackedUser);
 | 
			
		||||
	} catch (error) {
 | 
			
		||||
		Logger.error(error.toString());
 | 
			
		||||
	}
 | 
			
		||||
};
 | 
			
		||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue