diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 5b7359074..48ff00c8c 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -8,7 +8,7 @@ import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js'; import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js'; -import type { DbJobData, RelationshipJobData, ThinUser } from '../queue/types.js'; +import type { DbJobData, DeliverJobData, RelationshipJobData, ThinUser } from '../queue/types.js'; import type httpSignature from '@peertube/http-signature'; import type * as Bull from 'bullmq'; @@ -69,7 +69,7 @@ export class QueueService { if (content == null) return null; if (to == null) return null; - const data = { + const data: DeliverJobData = { user: { id: user.id, }, @@ -88,6 +88,38 @@ export class QueueService { }); } + /** + * ApDeliverManager-DeliverManager.execute()からinboxesを突っ込んでaddBulkしたい + * @param user `{ id: string; }` この関数ではThinUserに変換しないので前もって変換してください + * @param content IActivity | null + * @param inboxes `Map` / key: to (inbox url), value: isSharedInbox (whether it is sharedInbox) + * @returns void + */ + @bindThis + public async deliverMany(user: ThinUser, content: IActivity | null, inboxes: Map) { + const opts = { + attempts: this.config.deliverJobMaxAttempts ?? 12, + backoff: { + type: 'custom', + }, + removeOnComplete: true, + removeOnFail: true, + }; + + await this.deliverQueue.addBulk(Array.from(inboxes.entries()).map(d => ({ + name: d[0], + data: { + user, + content, + to: d[0], + isSharedInbox: d[1], + } as DeliverJobData, + opts, + }))); + + return; + } + @bindThis public inbox(activity: IActivity, signature: httpSignature.IParsedSignature) { const data = { diff --git a/packages/backend/src/core/activitypub/ApDeliverManagerService.ts b/packages/backend/src/core/activitypub/ApDeliverManagerService.ts index 62a2a33a1..66e776118 100644 --- a/packages/backend/src/core/activitypub/ApDeliverManagerService.ts +++ b/packages/backend/src/core/activitypub/ApDeliverManagerService.ts @@ -7,6 +7,7 @@ import type { LocalUser, RemoteUser, User } from '@/models/entities/User.js'; import { QueueService } from '@/core/QueueService.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { bindThis } from '@/decorators.js'; +import { ThinUser } from '@/queue/types.js'; interface IRecipe { type: string; @@ -94,7 +95,7 @@ export class ApDeliverManagerService { } class DeliverManager { - private actor: { id: User['id']; host: null; }; + private actor: ThinUser; private activity: any; private recipes: IRecipe[] = []; @@ -111,7 +112,13 @@ class DeliverManager { actor: { id: User['id']; host: null; }, activity: any, ) { - this.actor = actor; + // 型で弾いてはいるが一応ローカルユーザーかチェック + if (actor.host != null) throw new Error('actor.host must be null'); + + // パフォーマンス向上のためキューに突っ込むのはidのみに絞る + this.actor = { + id: actor.id, + }; this.activity = activity; } @@ -155,9 +162,8 @@ class DeliverManager { */ @bindThis public async execute() { - if (!this.userEntityService.isLocalUser(this.actor)) return; - // The value flags whether it is shared or not. + // key: inbox URL, value: whether it is sharedInbox const inboxes = new Map(); /* @@ -201,9 +207,6 @@ class DeliverManager { .forEach(recipe => inboxes.set(recipe.to.inbox!, false)); // deliver - for (const inbox of inboxes) { - // inbox[0]: inbox, inbox[1]: whether it is sharedInbox - this.queueService.deliver(this.actor, this.activity, inbox[0], inbox[1]); - } + this.queueService.deliverMany(this.actor, this.activity, inboxes); } }