Qualify kind of stream
This commit is contained in:
		
							parent
							
								
									0d0ca98e13
								
							
						
					
					
						commit
						a5fd30d535
					
				
					 11 changed files with 26 additions and 25 deletions
				
			
		|  | @ -33,7 +33,7 @@ async function convert(stickerItem) { | |||
| 	if (res.status !== 200) throw new Error("Sticker data file not found.") | ||||
| 	const text = await res.text() | ||||
| 
 | ||||
| 	// Convert to PNG (readable stream)
 | ||||
| 	// Convert to PNG (stream.Readable)
 | ||||
| 	const readablePng = await convertLottie.convert(text) | ||||
| 
 | ||||
| 	// Upload to MXC
 | ||||
|  |  | |||
|  | @ -21,7 +21,7 @@ const Rlottie = (async () => { | |||
| 
 | ||||
| /** | ||||
|  * @param {string} text | ||||
|  * @returns {Promise<NodeJS.ReadableStream>} | ||||
|  * @returns {Promise<stream.Readable>} | ||||
|  */ | ||||
| async function convert(text) { | ||||
| 	const r = await Rlottie | ||||
|  | @ -41,6 +41,7 @@ async function convert(text) { | |||
| 	png.data = Buffer.from(rendered) | ||||
| 	// png.pack() is a bad stream and will throw away any data it sends if it's not connected to a destination straight away.
 | ||||
| 	// We use Duplex.from to convert it into a good stream.
 | ||||
| 	// @ts-ignore
 | ||||
| 	return stream.Duplex.from(png.pack()) | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -2,7 +2,7 @@ | |||
| 
 | ||||
| const assert = require("assert").strict | ||||
| const DiscordTypes = require("discord-api-types/v10") | ||||
| const {Readable} = require("stream") | ||||
| const stream = require("stream") | ||||
| const passthrough = require("../../passthrough") | ||||
| const {discord, db, select} = passthrough | ||||
| 
 | ||||
|  | @ -57,7 +57,7 @@ async function withWebhook(channelID, callback) { | |||
| 
 | ||||
| /** | ||||
|  * @param {string} channelID | ||||
|  * @param {DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | Readable}[]}} data | ||||
|  * @param {DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | stream.Readable}[]}} data | ||||
|  * @param {string} [threadID] | ||||
|  */ | ||||
| async function sendMessageWithWebhook(channelID, data, threadID) { | ||||
|  | @ -70,7 +70,7 @@ async function sendMessageWithWebhook(channelID, data, threadID) { | |||
| /** | ||||
|  * @param {string} channelID | ||||
|  * @param {string} messageID | ||||
|  * @param {DiscordTypes.RESTPatchAPIWebhookWithTokenMessageJSONBody & {files?: {name: string, file: Buffer | Readable}[]}} data | ||||
|  * @param {DiscordTypes.RESTPatchAPIWebhookWithTokenMessageJSONBody & {files?: {name: string, file: Buffer | stream.Readable}[]}} data | ||||
|  * @param {string} [threadID] | ||||
|  */ | ||||
| async function editMessageWithWebhook(channelID, messageID, data, threadID) { | ||||
|  |  | |||
|  | @ -1,8 +1,7 @@ | |||
| // @ts-check
 | ||||
| 
 | ||||
| const {Readable} = require("stream") | ||||
| const stream = require("stream") | ||||
| const {sync} = require("../../passthrough") | ||||
| const assert = require("assert").strict | ||||
| 
 | ||||
| /** @type {import("../converters/emoji-sheet")} */ | ||||
| const emojiSheetConverter = sync.require("../converters/emoji-sheet") | ||||
|  | @ -20,8 +19,7 @@ async function getAndConvertEmoji(mxc) { | |||
| 	// If we were using connection pooling, we would be forced to download the entire GIF.
 | ||||
| 	// So we set no agent to ensure we are not connection pooling.
 | ||||
| 	const res = await api.getMedia(mxc, {signal: abortController.signal}) | ||||
| 	// @ts-ignore
 | ||||
| 	const readable = Readable.fromWeb(res.body) | ||||
| 	const readable = stream.Readable.fromWeb(res.body) | ||||
| 	return emojiSheetConverter.convertImageStream(readable, () => { | ||||
| 		abortController.abort() | ||||
| 		readable.emit("end") | ||||
|  |  | |||
|  | @ -2,7 +2,7 @@ | |||
| 
 | ||||
| const Ty = require("../../types") | ||||
| const DiscordTypes = require("discord-api-types/v10") | ||||
| const {Readable} = require("stream") | ||||
| const stream = require("stream") | ||||
| const assert = require("assert").strict | ||||
| const crypto = require("crypto") | ||||
| const passthrough = require("../../passthrough") | ||||
|  | @ -22,8 +22,8 @@ const editMessage = sync.require("../../d2m/actions/edit-message") | |||
| const emojiSheet = sync.require("../actions/emoji-sheet") | ||||
| 
 | ||||
| /** | ||||
|  * @param {DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | Readable}[], pendingFiles?: ({name: string, mxc: string} | {name: string, mxc: string, key: string, iv: string} | {name: string, buffer: Buffer | Readable})[]}} message | ||||
|  * @returns {Promise<DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | Readable}[]}>} | ||||
|  * @param {DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | stream.Readable}[], pendingFiles?: ({name: string, mxc: string} | {name: string, mxc: string, key: string, iv: string} | {name: string, buffer: Buffer | stream.Readable})[]}} message | ||||
|  * @returns {Promise<DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | stream.Readable}[]}>} | ||||
|  */ | ||||
| async function resolvePendingFiles(message) { | ||||
| 	if (!message.pendingFiles) return message | ||||
|  | @ -37,15 +37,14 @@ async function resolvePendingFiles(message) { | |||
| 		if ("key" in p) { | ||||
| 			// Encrypted file
 | ||||
| 			const d = crypto.createDecipheriv("aes-256-ctr", Buffer.from(p.key, "base64url"), Buffer.from(p.iv, "base64url")) | ||||
| 			await api.getMedia(p.mxc).then(res => Readable.fromWeb(res.body).pipe(d)) | ||||
| 			await api.getMedia(p.mxc).then(res => stream.Readable.fromWeb(res.body).pipe(d)) | ||||
| 			return { | ||||
| 				name: p.name, | ||||
| 				file: d | ||||
| 			} | ||||
| 		} else { | ||||
| 			// Unencrypted file
 | ||||
| 			/** @type {Readable} */ | ||||
| 			const body = await api.getMedia(p.mxc).then(res => Readable.fromWeb(res.body)) | ||||
| 			const body = await api.getMedia(p.mxc).then(res => stream.Readable.fromWeb(res.body)) | ||||
| 			return { | ||||
| 				name: p.name, | ||||
| 				file: body | ||||
|  |  | |||
|  | @ -1,6 +1,7 @@ | |||
| // @ts-check
 | ||||
| 
 | ||||
| const assert = require("assert").strict | ||||
| const stream = require("stream") | ||||
| const {pipeline} = require("stream").promises | ||||
| const sharp = require("sharp") | ||||
| const {GIFrame} = require("@cloudrac3r/giframe") | ||||
|  | @ -48,7 +49,7 @@ async function compositeMatrixEmojis(mxcs, mxcDownloader) { | |||
| } | ||||
| 
 | ||||
| /** | ||||
|  * @param {NodeJS.ReadableStream} streamIn | ||||
|  * @param {stream.Readable} streamIn | ||||
|  * @param {() => any} stopStream | ||||
|  * @returns {Promise<Buffer | undefined>} Uncompressed PNG image | ||||
|  */ | ||||
|  |  | |||
|  | @ -2,7 +2,7 @@ | |||
| 
 | ||||
| const Ty = require("../../types") | ||||
| const DiscordTypes = require("discord-api-types/v10") | ||||
| const {Readable} = require("stream") | ||||
| const stream = require("stream") | ||||
| const chunk = require("chunk-text") | ||||
| const TurndownService = require("@cloudrac3r/turndown") | ||||
| const domino = require("domino") | ||||
|  | @ -820,7 +820,7 @@ async function eventToMessage(event, guild, di) { | |||
| 
 | ||||
| 	// Split into 2000 character chunks
 | ||||
| 	const chunks = chunk(content, 2000) | ||||
| 	/** @type {(DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | Readable}[]})[]} */ | ||||
| 	/** @type {(DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | stream.Readable}[]})[]} */ | ||||
| 	const messages = chunks.map(content => ({ | ||||
| 		content, | ||||
| 		allowed_mentions: { | ||||
|  |  | |||
|  | @ -2,6 +2,7 @@ | |||
| 
 | ||||
| const Ty = require("../types") | ||||
| const assert = require("assert").strict | ||||
| const streamWeb = require("stream/web") | ||||
| 
 | ||||
| const passthrough = require("../passthrough") | ||||
| const {sync} = passthrough | ||||
|  | @ -343,7 +344,7 @@ async function ping() { | |||
| /** | ||||
|  * @param {string} mxc | ||||
|  * @param {RequestInit} [init] | ||||
|  * @return {Promise<Response & {body: import("stream/web").ReadableStream<Uint8Array>}>} | ||||
|  * @return {Promise<Response & {body: streamWeb.ReadableStream<Uint8Array>}>} | ||||
|  */ | ||||
| async function getMedia(mxc, init = {}) { | ||||
| 	const mediaParts = mxc?.match(/^mxc:\/\/([^/]+)\/(\w+)$/) | ||||
|  |  | |||
|  | @ -2,7 +2,7 @@ | |||
| 
 | ||||
| const mixin = require("@cloudrac3r/mixin-deep") | ||||
| const stream = require("stream") | ||||
| const {ReadableStream} = require("stream/web") | ||||
| const streamWeb = require("stream/web") | ||||
| const getStream = require("get-stream") | ||||
| 
 | ||||
| const {reg, writeRegistration} = require("./read-registration.js") | ||||
|  | @ -22,7 +22,7 @@ class MatrixServerError extends Error { | |||
| /** | ||||
|  * @param {string} method | ||||
|  * @param {string} url | ||||
|  * @param {string | object | ReadableStream | stream.Readable} [body] | ||||
|  * @param {string | object | streamWeb.ReadableStream | stream.Readable} [body] | ||||
|  * @param {any} [extra] | ||||
|  */ | ||||
| async function mreq(method, url, body, extra = {}) { | ||||
|  | @ -30,7 +30,7 @@ async function mreq(method, url, body, extra = {}) { | |||
| 		body = JSON.stringify(body) | ||||
| 	} else if (body instanceof stream.Readable && reg.ooye.content_length_workaround) { | ||||
| 		body = await getStream.buffer(body) | ||||
| 	} else if (body instanceof ReadableStream && reg.ooye.content_length_workaround) { | ||||
| 	} else if (body instanceof streamWeb.ReadableStream && reg.ooye.content_length_workaround) { | ||||
| 		body = await stream.consumers.buffer(stream.Readable.fromWeb(body)) | ||||
| 	} | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,5 +1,6 @@ | |||
| // @ts-check
 | ||||
| 
 | ||||
| const assert = require("assert") | ||||
| const fs = require("fs") | ||||
| const {join} = require("path") | ||||
| const h3 = require("h3") | ||||
|  | @ -38,8 +39,8 @@ function compressResponse(event, response) { | |||
| 	if (!getRequestHeader(event, "accept-encoding")?.includes("gzip")) return | ||||
| 	/* c8 ignore next */ | ||||
| 	if (typeof response.body !== "string") return | ||||
| 	/** @type {ReadableStream} */ // @ts-ignore
 | ||||
| 	const stream = new Response(response.body).body | ||||
| 	assert(stream) | ||||
| 	setResponseHeader(event, "content-encoding", "gzip") | ||||
| 	response.body = stream.pipeThrough(new CompressionStream("gzip")) | ||||
| } | ||||
|  |  | |||
|  | @ -1,6 +1,6 @@ | |||
| // @ts-check
 | ||||
| 
 | ||||
| const {ReadableStream} = require("stream/web") | ||||
| const streamWeb = require("stream/web") | ||||
| const {test} = require("supertape") | ||||
| const {router} = require("../../test/web") | ||||
| const assert = require("assert").strict | ||||
|  | @ -30,7 +30,7 @@ test("web server: compresses static resources", async t => { | |||
| 			"accept-encoding": "gzip" | ||||
| 		} | ||||
| 	}) | ||||
| 	assert(content instanceof ReadableStream) | ||||
| 	assert(content instanceof streamWeb.ReadableStream) | ||||
| 	const firstChunk = await content.getReader().read() | ||||
| 	t.ok(firstChunk.value instanceof Uint8Array, "can get data") | ||||
| 	t.deepEqual(firstChunk.value.slice(0, 3), Uint8Array.from([31, 139, 8]), "has compressed gzip header") | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue