diff --git a/src/argv.ts b/src/argv.ts index 31325d138d..c13c538890 100644 --- a/src/argv.ts +++ b/src/argv.ts @@ -5,6 +5,7 @@ program .version(pkg.version) .option('--no-daemons', 'Disable daemon processes (for debbuging)') .option('--disable-clustering', 'Disable clustering') + .option('--disable-ap-queue', 'Disable creating job queue related to ap') .option('--disable-queue', 'Disable job queue processing') .option('--only-queue', 'Pocessing job queue only') .option('--quiet', 'Suppress all logs') @@ -13,6 +14,7 @@ program .option('--color', 'This option is a dummy for some external program\'s (e.g. forever) issue.') .parse(process.argv); +if (process.env.MK_DISABLE_AP_QUEUE) program.disableApQueue = true; if (process.env.MK_DISABLE_QUEUE) program.disableQueue = true; if (process.env.MK_ONLY_QUEUE) program.onlyQueue = true; diff --git a/src/queue/index.ts b/src/queue/index.ts index 54c0c13ae4..8105999b60 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -1,6 +1,7 @@ import * as Queue from 'bee-queue'; -import config from '../config'; +import * as httpSignature from 'http-signature'; +import config from '../config'; import { ILocalUser } from '../models/user'; import { program } from '../argv'; import handler from './processors'; @@ -31,10 +32,19 @@ function initializeQueue() { } } -export function createHttpJob(data: any) { - if (queueAvailable) { +export function deliver(user: ILocalUser, content: any, to: any) { + if (content == null) return; + + const data = { + type: 'deliver', + user, + content, + to + }; + + if (queueAvailable && !program.disableApQueue) { return queue.createJob(data) - .retries(3) + .retries(8) .backoff('exponential', 1000) .save(); } else { @@ -42,15 +52,21 @@ export function createHttpJob(data: any) { } } -export function deliver(user: ILocalUser, content: any, to: any) { - if (content == null) return; +export function processInbox(activity: any, signature: httpSignature.IParsedSignature) { + const data = { + type: 'processInbox', + activity: activity, + signature + }; - createHttpJob({ - type: 'deliver', - user, - content, - to - }); + if (queueAvailable && !program.disableApQueue) { + return queue.createJob(data) + .retries(3) + .backoff('exponential', 500) + .save(); + } else { + return handler({ data }, () => {}); + } } export function createExportNotesJob(user: ILocalUser) { diff --git a/src/server/activitypub.ts b/src/server/activitypub.ts index 0c0ae6c529..24c8e556b6 100644 --- a/src/server/activitypub.ts +++ b/src/server/activitypub.ts @@ -3,7 +3,6 @@ import * as Router from 'koa-router'; import * as json from 'koa-json-body'; import * as httpSignature from 'http-signature'; -import { createHttpJob } from '../queue'; import { renderActivity } from '../remote/activitypub/renderer'; import Note from '../models/note'; import User, { isLocalUser, ILocalUser, IUser } from '../models/user'; @@ -17,6 +16,7 @@ import Followers from './activitypub/followers'; import Following from './activitypub/following'; import Featured from './activitypub/featured'; import renderQuestion from '../remote/activitypub/renderer/question'; +import { processInbox } from '../queue'; // Init router const router = new Router(); @@ -35,11 +35,7 @@ function inbox(ctx: Router.IRouterContext) { return; } - createHttpJob({ - type: 'processInbox', - activity: ctx.request.body, - signature - }); + processInbox(ctx.request.body, signature); ctx.status = 202; }