From 6589e8a3909e42d8d116b06a608d5cfefd6aa7ed Mon Sep 17 00:00:00 2001 From: tamaina Date: Tue, 24 Jan 2023 15:54:14 +0900 Subject: [PATCH] Fix #9710 ? (#9712) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * wip * update pnpm-lock * use our own DevNull * fix * deliverJobConcurrencyをmacSocketsで割ってソケット数にする --- .../backend/src/core/HttpRequestService.ts | 37 ++++++++++++++++--- .../src/core/activitypub/ApRequestService.ts | 7 +++- packages/backend/src/misc/dev-null.ts | 11 ++++++ pnpm-lock.yaml | 13 +------ 4 files changed, 49 insertions(+), 19 deletions(-) create mode 100644 packages/backend/src/misc/dev-null.ts diff --git a/packages/backend/src/core/HttpRequestService.ts b/packages/backend/src/core/HttpRequestService.ts index 3a70ac77f..cd859d002 100644 --- a/packages/backend/src/core/HttpRequestService.ts +++ b/packages/backend/src/core/HttpRequestService.ts @@ -134,9 +134,34 @@ export class UndiciFetcher { return res; } + @bindThis + public async request( + url: string | URL, + options: { dispatcher?: undici.Dispatcher } & Omit & Partial> = {}, + privateOptions: { noOkError?: boolean; bypassProxy?: boolean; } = { noOkError: false, bypassProxy: false }, + ): Promise { + const res = await undici.request(url, { + dispatcher: this.getAgentByUrl(new URL(url), privateOptions.bypassProxy), + ...options, + headers: { + 'user-agent': this.userAgent ?? '', + ...(options.headers ?? {}), + }, + }).catch((err) => { + this.logger?.error(`fetch error to ${typeof url === 'string' ? url : url.href}`, err); + throw new StatusError('Resource Unreachable', 500, 'Resource Unreachable'); + }); + + if (res.statusCode >= 400) { + throw new StatusError(`${res.statusCode}`, res.statusCode, ''); + } + + return res; + } + @bindThis public async getJson(url: string, accept = 'application/json, */*', headers?: Record): Promise { - const res = await this.fetch( + const { body } = await this.request( url, { headers: Object.assign({ @@ -145,12 +170,12 @@ export class UndiciFetcher { }, ); - return await res.json() as T; + return await body.json() as T; } @bindThis public async getHtml(url: string, accept = 'text/html, */*', headers?: Record): Promise { - const res = await this.fetch( + const { body } = await this.request( url, { headers: Object.assign({ @@ -159,7 +184,7 @@ export class UndiciFetcher { }, ); - return await res.text(); + return await body.text(); } } @@ -167,6 +192,7 @@ export class UndiciFetcher { export class HttpRequestService { public defaultFetcher: UndiciFetcher; public fetch: UndiciFetcher['fetch']; + public request: UndiciFetcher['request']; public getHtml: UndiciFetcher['getHtml']; public defaultJsonFetcher: UndiciFetcher; public getJson: UndiciFetcher['getJson']; @@ -221,11 +247,12 @@ export class HttpRequestService { }, }; - this.maxSockets = Math.max(64, this.config.deliverJobConcurrency ?? 128); + this.maxSockets = Math.max(64, ((this.config.deliverJobConcurrency ?? 128) / (this.config.clusterLimit ?? 1))); this.defaultFetcher = this.createFetcher({}, {}, this.logger); this.fetch = this.defaultFetcher.fetch; + this.request = this.defaultFetcher.request; this.getHtml = this.defaultFetcher.getHtml; this.defaultJsonFetcher = this.createFetcher({ diff --git a/packages/backend/src/core/activitypub/ApRequestService.ts b/packages/backend/src/core/activitypub/ApRequestService.ts index fd7319702..db87475c4 100644 --- a/packages/backend/src/core/activitypub/ApRequestService.ts +++ b/packages/backend/src/core/activitypub/ApRequestService.ts @@ -9,10 +9,12 @@ import { HttpRequestService, UndiciFetcher } from '@/core/HttpRequestService.js' import { LoggerService } from '@/core/LoggerService.js'; import { bindThis } from '@/decorators.js'; import type Logger from '@/logger.js'; +import type { Dispatcher } from 'undici'; +import { DevNull } from '@/misc/dev-null.js'; type Request = { url: string; - method: string; + method: Dispatcher.HttpMethod; headers: Record; }; @@ -163,7 +165,7 @@ export class ApRequestService { }, }); - await this.undiciFetcher.fetch( + const response = await this.undiciFetcher.request( url, { method: req.request.method, @@ -171,6 +173,7 @@ export class ApRequestService { body, }, ); + response.body.pipe(new DevNull()); } /** diff --git a/packages/backend/src/misc/dev-null.ts b/packages/backend/src/misc/dev-null.ts new file mode 100644 index 000000000..38b9d8266 --- /dev/null +++ b/packages/backend/src/misc/dev-null.ts @@ -0,0 +1,11 @@ +import { Writable, WritableOptions } from "node:stream"; + +export class DevNull extends Writable implements NodeJS.WritableStream { + constructor(opts?: WritableOptions) { + super(opts); + } + + _write (chunk: any, encoding: BufferEncoding, cb: (err?: Error | null) => void) { + setImmediate(cb); + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index cdbbd3b82..3fdaf1449 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -3801,7 +3801,7 @@ packages: /axios/0.24.0: resolution: {integrity: sha512-Q6cWsys88HoPgAaFAVUb0WpPk0O8iTeisR9IMqy9G8AbO4NlpVknrnQS03zzF9PGAWgO3cgletO3VjV/P7VztA==} dependencies: - follow-redirects: 1.15.2 + follow-redirects: 1.15.2_debug@4.3.4 transitivePeerDependencies: - debug dev: false @@ -6973,16 +6973,6 @@ packages: readable-stream: 2.3.7 dev: false - /follow-redirects/1.15.2: - resolution: {integrity: sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==} - engines: {node: '>=4.0'} - peerDependencies: - debug: '*' - peerDependenciesMeta: - debug: - optional: true - dev: false - /follow-redirects/1.15.2_debug@4.3.4: resolution: {integrity: sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==} engines: {node: '>=4.0'} @@ -6993,7 +6983,6 @@ packages: optional: true dependencies: debug: 4.3.4 - dev: true /for-each/0.3.3: resolution: {integrity: sha512-jqYfLp7mo9vIyQf8ykW2v7A+2N4QjeCeI5+Dz9XraiO1ign81wjiH7Fb9vSOWvQfNtmSa4H2RoQTrrXivdUZmw==}