From c934987b14dc2b0c362f2d13e0664ab275aca522 Mon Sep 17 00:00:00 2001 From: syuilo Date: Thu, 7 Mar 2019 23:07:21 +0900 Subject: [PATCH] Resolve #4444 --- package.json | 3 +- src/argv.ts | 4 +- src/queue/index.ts | 149 ++++++++---------- .../processors/{ => db}/delete-drive-files.ts | 16 +- src/queue/processors/{ => db}/delete-notes.ts | 16 +- .../processors/{ => db}/export-blocking.ts | 18 +-- .../processors/{ => db}/export-following.ts | 18 +-- src/queue/processors/{ => db}/export-mute.ts | 18 +-- src/queue/processors/{ => db}/export-notes.ts | 16 +- src/queue/processors/{ => db}/index.ts | 23 +-- src/queue/processors/{http => }/deliver.ts | 15 +- .../processors/{http => }/process-inbox.ts | 22 +-- src/server/activitypub.ts | 2 +- 13 files changed, 146 insertions(+), 174 deletions(-) rename src/queue/processors/{ => db}/delete-drive-files.ts (67%) rename src/queue/processors/{ => db}/delete-notes.ts (68%) rename src/queue/processors/{ => db}/export-blocking.ts (80%) rename src/queue/processors/{ => db}/export-following.ts (80%) rename src/queue/processors/{ => db}/export-mute.ts (80%) rename src/queue/processors/{ => db}/export-notes.ts (85%) rename src/queue/processors/{ => db}/index.ts (50%) rename src/queue/processors/{http => }/deliver.ts (78%) rename src/queue/processors/{http => }/process-inbox.ts (88%) diff --git a/package.json b/package.json index 699d7e95a..b42ef167c 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "@prezzemolo/rap": "0.1.2", "@prezzemolo/zip": "0.0.3", "@types/bcryptjs": "2.4.2", + "@types/bull": "3.5.8", "@types/chai-http": "3.0.5", "@types/dateformat": "3.0.0", "@types/deep-equal": "1.0.1", @@ -100,8 +101,8 @@ "autosize": "4.0.2", "autwh": "0.1.0", "bcryptjs": "2.4.3", - "bee-queue": "1.2.2", "bootstrap-vue": "2.0.0-rc.13", + "bull": "3.7.0", "cafy": "15.1.0", "chai": "4.2.0", "chai-http": "4.2.1", diff --git a/src/argv.ts b/src/argv.ts index d9cd12dfc..b5540441c 100644 --- a/src/argv.ts +++ b/src/argv.ts @@ -5,8 +5,7 @@ program .version(pkg.version) .option('--no-daemons', 'Disable daemon processes (for debbuging)') .option('--disable-clustering', 'Disable clustering') - .option('--disable-queue', 'Disable job queue processing') - .option('--only-server', 'Run server only (without job queue)') + .option('--only-server', 'Run server only (without job queue processing)') .option('--only-queue', 'Pocessing job queue only (without server)') .option('--quiet', 'Suppress all logs') .option('--verbose', 'Enable all logs') @@ -15,7 +14,6 @@ program .option('--color', 'This option is a dummy for some external program\'s (e.g. forever) issue.') .parse(process.argv); -/*if (process.env.MK_DISABLE_QUEUE)*/ program.disableQueue = true; if (process.env.MK_ONLY_QUEUE) program.onlyQueue = true; export { program }; diff --git a/src/queue/index.ts b/src/queue/index.ts index 351a035ad..bb3b66908 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -1,73 +1,64 @@ -import * as Queue from 'bee-queue'; +import * as Queue from 'bull'; import * as httpSignature from 'http-signature'; import config from '../config'; import { ILocalUser } from '../models/user'; import { program } from '../argv'; -import handler from './processors'; -import { queueLogger } from './logger'; -const enableQueue = !program.disableQueue; -const enableQueueProcessing = !program.onlyServer && enableQueue; -const queueAvailable = config.redis != null; +import processDeliver from './processors/deliver'; +import processInbox from './processors/process-inbox'; +import processDb from './processors/db'; -const queue = initializeQueue(); - -function initializeQueue() { - if (queueAvailable && enableQueue) { - return new Queue('misskey-queue', { - redis: { - port: config.redis.port, - host: config.redis.host, - password: config.redis.pass - }, - - removeOnSuccess: true, - removeOnFailure: true, - getEvents: false, - sendEvents: false, - storeJobs: false - }); - } else { - return null; - } +function initializeQueue(name: string) { + return new Queue(name, config.redis != null ? { + redis: { + port: config.redis.port, + host: config.redis.host, + password: config.redis.pass, + db: 1 + } + } : null); } +const deliverQueue = initializeQueue('deliver'); +const inboxQueue = initializeQueue('inbox'); +const dbQueue = initializeQueue('db'); + export function deliver(user: ILocalUser, content: any, to: any) { - if (content == null) return; + if (content == null) return null; const data = { - type: 'deliver', user, content, to }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data) - .retries(8) - .backoff('exponential', 1000) - .save(); - } else { - return handler({ data }, () => {}); - } + return deliverQueue.add(data, { + attempts: 4, + backoff: { + type: 'exponential', + delay: 1000 + }, + removeOnComplete: true, + removeOnFail: true + }); } -export function processInbox(activity: any, signature: httpSignature.IParsedSignature) { +export function inbox(activity: any, signature: httpSignature.IParsedSignature) { const data = { - type: 'processInbox', activity: activity, signature }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data) - .retries(3) - .backoff('exponential', 500) - .save(); - } else { - return handler({ data }, () => {}); - } + return inboxQueue.add(data, { + attempts: 4, + backoff: { + type: 'exponential', + delay: 1000 + }, + removeOnComplete: true, + removeOnFail: true + }); } export function createDeleteNotesJob(user: ILocalUser) { @@ -76,11 +67,10 @@ export function createDeleteNotesJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createDeleteDriveFilesJob(user: ILocalUser) { @@ -89,11 +79,10 @@ export function createDeleteDriveFilesJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportNotesJob(user: ILocalUser) { @@ -102,11 +91,10 @@ export function createExportNotesJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportFollowingJob(user: ILocalUser) { @@ -115,11 +103,10 @@ export function createExportFollowingJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportMuteJob(user: ILocalUser) { @@ -128,11 +115,10 @@ export function createExportMuteJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportBlockingJob(user: ILocalUser) { @@ -141,24 +127,23 @@ export function createExportBlockingJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export default function() { - if (queueAvailable && enableQueueProcessing) { - queue.process(128, handler); - queueLogger.succ('Processing started'); + if (!program.onlyServer) { + deliverQueue.process(processDeliver); + inboxQueue.process(processInbox); + dbQueue.process(processDb); } - - return queue; } export function destroy() { + /* queue.destroy().then(n => { queueLogger.succ(`All job removed (${n} jobs)`); - }); + });*/ } diff --git a/src/queue/processors/delete-drive-files.ts b/src/queue/processors/db/delete-drive-files.ts similarity index 67% rename from src/queue/processors/delete-drive-files.ts rename to src/queue/processors/db/delete-drive-files.ts index 7e76aa73e..3de960a25 100644 --- a/src/queue/processors/delete-drive-files.ts +++ b/src/queue/processors/db/delete-drive-files.ts @@ -1,14 +1,14 @@ -import * as bq from 'bee-queue'; +import * as Bull from 'bull'; import * as mongo from 'mongodb'; -import { queueLogger } from '../logger'; -import User from '../../models/user'; -import DriveFile from '../../models/drive-file'; -import deleteFile from '../../services/drive/delete-file'; +import { queueLogger } from '../../logger'; +import User from '../../../models/user'; +import DriveFile from '../../../models/drive-file'; +import deleteFile from '../../../services/drive/delete-file'; const logger = queueLogger.createSubLogger('delete-drive-files'); -export async function deleteDriveFiles(job: bq.Job, done: any): Promise { +export async function deleteDriveFiles(job: Bull.Job, done: any): Promise { logger.info(`Deleting drive files of ${job.data.user._id} ...`); const user = await User.findOne({ @@ -32,7 +32,7 @@ export async function deleteDriveFiles(job: bq.Job, done: any): Promise { if (files.length === 0) { ended = true; - if (job.reportProgress) job.reportProgress(100); + job.progress(100); break; } @@ -47,7 +47,7 @@ export async function deleteDriveFiles(job: bq.Job, done: any): Promise { userId: user._id, }); - if (job.reportProgress) job.reportProgress(deletedCount / total); + job.progress(deletedCount / total); } logger.succ(`All drive files (${deletedCount}) of ${user._id} has been deleted.`); diff --git a/src/queue/processors/delete-notes.ts b/src/queue/processors/db/delete-notes.ts similarity index 68% rename from src/queue/processors/delete-notes.ts rename to src/queue/processors/db/delete-notes.ts index 13c6042b1..021db8062 100644 --- a/src/queue/processors/delete-notes.ts +++ b/src/queue/processors/db/delete-notes.ts @@ -1,14 +1,14 @@ -import * as bq from 'bee-queue'; +import * as Bull from 'bull'; import * as mongo from 'mongodb'; -import { queueLogger } from '../logger'; -import Note from '../../models/note'; -import deleteNote from '../../services/note/delete'; -import User from '../../models/user'; +import { queueLogger } from '../../logger'; +import Note from '../../../models/note'; +import deleteNote from '../../../services/note/delete'; +import User from '../../../models/user'; const logger = queueLogger.createSubLogger('delete-notes'); -export async function deleteNotes(job: bq.Job, done: any): Promise { +export async function deleteNotes(job: Bull.Job, done: any): Promise { logger.info(`Deleting notes of ${job.data.user._id} ...`); const user = await User.findOne({ @@ -32,7 +32,7 @@ export async function deleteNotes(job: bq.Job, done: any): Promise { if (notes.length === 0) { ended = true; - if (job.reportProgress) job.reportProgress(100); + job.progress(100); break; } @@ -47,7 +47,7 @@ export async function deleteNotes(job: bq.Job, done: any): Promise { userId: user._id, }); - if (job.reportProgress) job.reportProgress(deletedCount / total); + job.progress(deletedCount / total); } logger.succ(`All notes (${deletedCount}) of ${user._id} has been deleted.`); diff --git a/src/queue/processors/export-blocking.ts b/src/queue/processors/db/export-blocking.ts similarity index 80% rename from src/queue/processors/export-blocking.ts rename to src/queue/processors/db/export-blocking.ts index b30d8e3bc..e56aec94a 100644 --- a/src/queue/processors/export-blocking.ts +++ b/src/queue/processors/db/export-blocking.ts @@ -1,18 +1,18 @@ -import * as bq from 'bee-queue'; +import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; import * as mongo from 'mongodb'; -import { queueLogger } from '../logger'; -import addFile from '../../services/drive/add-file'; -import User from '../../models/user'; +import { queueLogger } from '../../logger'; +import addFile from '../../../services/drive/add-file'; +import User from '../../../models/user'; import dateFormat = require('dateformat'); -import Blocking from '../../models/blocking'; -import config from '../../config'; +import Blocking from '../../../models/blocking'; +import config from '../../../config'; const logger = queueLogger.createSubLogger('export-blocking'); -export async function exportBlocking(job: bq.Job, done: any): Promise { +export async function exportBlocking(job: Bull.Job, done: any): Promise { logger.info(`Exporting blocking of ${job.data.user._id} ...`); const user = await User.findOne({ @@ -48,7 +48,7 @@ export async function exportBlocking(job: bq.Job, done: any): Promise { if (blockings.length === 0) { ended = true; - if (job.reportProgress) job.reportProgress(100); + job.progress(100); break; } @@ -74,7 +74,7 @@ export async function exportBlocking(job: bq.Job, done: any): Promise { blockerId: user._id, }); - if (job.reportProgress) job.reportProgress(exportedCount / total); + job.progress(exportedCount / total); } stream.end(); diff --git a/src/queue/processors/export-following.ts b/src/queue/processors/db/export-following.ts similarity index 80% rename from src/queue/processors/export-following.ts rename to src/queue/processors/db/export-following.ts index e6521d065..1d8a501b7 100644 --- a/src/queue/processors/export-following.ts +++ b/src/queue/processors/db/export-following.ts @@ -1,18 +1,18 @@ -import * as bq from 'bee-queue'; +import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; import * as mongo from 'mongodb'; -import { queueLogger } from '../logger'; -import addFile from '../../services/drive/add-file'; -import User from '../../models/user'; +import { queueLogger } from '../../logger'; +import addFile from '../../../services/drive/add-file'; +import User from '../../../models/user'; import dateFormat = require('dateformat'); -import Following from '../../models/following'; -import config from '../../config'; +import Following from '../../../models/following'; +import config from '../../../config'; const logger = queueLogger.createSubLogger('export-following'); -export async function exportFollowing(job: bq.Job, done: any): Promise { +export async function exportFollowing(job: Bull.Job, done: any): Promise { logger.info(`Exporting following of ${job.data.user._id} ...`); const user = await User.findOne({ @@ -48,7 +48,7 @@ export async function exportFollowing(job: bq.Job, done: any): Promise { if (followings.length === 0) { ended = true; - if (job.reportProgress) job.reportProgress(100); + job.progress(100); break; } @@ -74,7 +74,7 @@ export async function exportFollowing(job: bq.Job, done: any): Promise { followerId: user._id, }); - if (job.reportProgress) job.reportProgress(exportedCount / total); + job.progress(exportedCount / total); } stream.end(); diff --git a/src/queue/processors/export-mute.ts b/src/queue/processors/db/export-mute.ts similarity index 80% rename from src/queue/processors/export-mute.ts rename to src/queue/processors/db/export-mute.ts index 74456c1da..6f2dd6df1 100644 --- a/src/queue/processors/export-mute.ts +++ b/src/queue/processors/db/export-mute.ts @@ -1,18 +1,18 @@ -import * as bq from 'bee-queue'; +import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; import * as mongo from 'mongodb'; -import { queueLogger } from '../logger'; -import addFile from '../../services/drive/add-file'; -import User from '../../models/user'; +import { queueLogger } from '../../logger'; +import addFile from '../../../services/drive/add-file'; +import User from '../../../models/user'; import dateFormat = require('dateformat'); -import Mute from '../../models/mute'; -import config from '../../config'; +import Mute from '../../../models/mute'; +import config from '../../../config'; const logger = queueLogger.createSubLogger('export-mute'); -export async function exportMute(job: bq.Job, done: any): Promise { +export async function exportMute(job: Bull.Job, done: any): Promise { logger.info(`Exporting mute of ${job.data.user._id} ...`); const user = await User.findOne({ @@ -48,7 +48,7 @@ export async function exportMute(job: bq.Job, done: any): Promise { if (mutes.length === 0) { ended = true; - if (job.reportProgress) job.reportProgress(100); + job.progress(100); break; } @@ -74,7 +74,7 @@ export async function exportMute(job: bq.Job, done: any): Promise { muterId: user._id, }); - if (job.reportProgress) job.reportProgress(exportedCount / total); + job.progress(exportedCount / total); } stream.end(); diff --git a/src/queue/processors/export-notes.ts b/src/queue/processors/db/export-notes.ts similarity index 85% rename from src/queue/processors/export-notes.ts rename to src/queue/processors/db/export-notes.ts index 32e4cd1d6..8f3cdc5b9 100644 --- a/src/queue/processors/export-notes.ts +++ b/src/queue/processors/db/export-notes.ts @@ -1,17 +1,17 @@ -import * as bq from 'bee-queue'; +import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; import * as mongo from 'mongodb'; -import { queueLogger } from '../logger'; -import Note, { INote } from '../../models/note'; -import addFile from '../../services/drive/add-file'; -import User from '../../models/user'; +import { queueLogger } from '../../logger'; +import Note, { INote } from '../../../models/note'; +import addFile from '../../../services/drive/add-file'; +import User from '../../../models/user'; import dateFormat = require('dateformat'); const logger = queueLogger.createSubLogger('export-notes'); -export async function exportNotes(job: bq.Job, done: any): Promise { +export async function exportNotes(job: Bull.Job, done: any): Promise { logger.info(`Exporting notes of ${job.data.user._id} ...`); const user = await User.findOne({ @@ -58,7 +58,7 @@ export async function exportNotes(job: bq.Job, done: any): Promise { if (notes.length === 0) { ended = true; - if (job.reportProgress) job.reportProgress(100); + job.progress(100); break; } @@ -83,7 +83,7 @@ export async function exportNotes(job: bq.Job, done: any): Promise { userId: user._id, }); - if (job.reportProgress) job.reportProgress(exportedNotesCount / total); + job.progress(exportedNotesCount / total); } await new Promise((res, rej) => { diff --git a/src/queue/processors/index.ts b/src/queue/processors/db/index.ts similarity index 50% rename from src/queue/processors/index.ts rename to src/queue/processors/db/index.ts index 31e87c3f6..91d7f06a4 100644 --- a/src/queue/processors/index.ts +++ b/src/queue/processors/db/index.ts @@ -1,31 +1,20 @@ -import deliver from './http/deliver'; -import processInbox from './http/process-inbox'; +import * as Bull from 'bull'; import { deleteNotes } from './delete-notes'; import { deleteDriveFiles } from './delete-drive-files'; import { exportNotes } from './export-notes'; import { exportFollowing } from './export-following'; import { exportMute } from './export-mute'; import { exportBlocking } from './export-blocking'; -import { queueLogger } from '../logger'; -const handlers: any = { - deliver, - processInbox, +const jobs = { deleteNotes, deleteDriveFiles, exportNotes, exportFollowing, exportMute, exportBlocking, -}; +} as any; -export default (job: any, done: any) => { - const handler = handlers[job.data.type]; - - if (handler) { - handler(job, done); - } else { - queueLogger.error(`Unknown job: ${job.data.type}`); - done(); - } -}; +export default function(job: Bull.Job, done: any) { + jobs[job.data.type](job, done); +} diff --git a/src/queue/processors/http/deliver.ts b/src/queue/processors/deliver.ts similarity index 78% rename from src/queue/processors/http/deliver.ts rename to src/queue/processors/deliver.ts index 96f6cc07c..2f730c31c 100644 --- a/src/queue/processors/http/deliver.ts +++ b/src/queue/processors/deliver.ts @@ -1,14 +1,13 @@ -import * as bq from 'bee-queue'; - -import request from '../../../remote/activitypub/request'; -import { queueLogger } from '../../logger'; -import { registerOrFetchInstanceDoc } from '../../../services/register-or-fetch-instance-doc'; -import Instance from '../../../models/instance'; -import instanceChart from '../../../services/chart/instance'; +import * as Bull from 'bull'; +import request from '../../remote/activitypub/request'; +import { queueLogger } from '../logger'; +import { registerOrFetchInstanceDoc } from '../../services/register-or-fetch-instance-doc'; +import Instance from '../../models/instance'; +import instanceChart from '../../services/chart/instance'; let latest: string = null; -export default async (job: bq.Job, done: any): Promise => { +export default async (job: Bull.Job, done: any): Promise => { const { host } = new URL(job.data.to); try { diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/process-inbox.ts similarity index 88% rename from src/queue/processors/http/process-inbox.ts rename to src/queue/processors/process-inbox.ts index cc4e711d0..21292921f 100644 --- a/src/queue/processors/http/process-inbox.ts +++ b/src/queue/processors/process-inbox.ts @@ -1,21 +1,21 @@ -import * as bq from 'bee-queue'; +import * as Bull from 'bull'; import * as httpSignature from 'http-signature'; -import parseAcct from '../../../misc/acct/parse'; -import User, { IRemoteUser } from '../../../models/user'; -import perform from '../../../remote/activitypub/perform'; -import { resolvePerson, updatePerson } from '../../../remote/activitypub/models/person'; +import parseAcct from '../../misc/acct/parse'; +import User, { IRemoteUser } from '../../models/user'; +import perform from '../../remote/activitypub/perform'; +import { resolvePerson, updatePerson } from '../../remote/activitypub/models/person'; import { toUnicode } from 'punycode'; import { URL } from 'url'; -import { publishApLogStream } from '../../../services/stream'; -import Logger from '../../../services/logger'; -import { registerOrFetchInstanceDoc } from '../../../services/register-or-fetch-instance-doc'; -import Instance from '../../../models/instance'; -import instanceChart from '../../../services/chart/instance'; +import { publishApLogStream } from '../../services/stream'; +import Logger from '../../services/logger'; +import { registerOrFetchInstanceDoc } from '../../services/register-or-fetch-instance-doc'; +import Instance from '../../models/instance'; +import instanceChart from '../../services/chart/instance'; const logger = new Logger('inbox'); // ユーザーのinboxにアクティビティが届いた時の処理 -export default async (job: bq.Job, done: any): Promise => { +export default async (job: Bull.Job, done: any): Promise => { const signature = job.data.signature; const activity = job.data.activity; diff --git a/src/server/activitypub.ts b/src/server/activitypub.ts index 24c8e556b..2151ada34 100644 --- a/src/server/activitypub.ts +++ b/src/server/activitypub.ts @@ -16,7 +16,7 @@ import Followers from './activitypub/followers'; import Following from './activitypub/following'; import Featured from './activitypub/featured'; import renderQuestion from '../remote/activitypub/renderer/question'; -import { processInbox } from '../queue'; +import { inbox as processInbox } from '../queue'; // Init router const router = new Router();