forked from cadence/out-of-your-element
Qualify kind of stream
This commit is contained in:
parent
0d0ca98e13
commit
a5fd30d535
11 changed files with 26 additions and 25 deletions
|
@ -33,7 +33,7 @@ async function convert(stickerItem) {
|
||||||
if (res.status !== 200) throw new Error("Sticker data file not found.")
|
if (res.status !== 200) throw new Error("Sticker data file not found.")
|
||||||
const text = await res.text()
|
const text = await res.text()
|
||||||
|
|
||||||
// Convert to PNG (readable stream)
|
// Convert to PNG (stream.Readable)
|
||||||
const readablePng = await convertLottie.convert(text)
|
const readablePng = await convertLottie.convert(text)
|
||||||
|
|
||||||
// Upload to MXC
|
// Upload to MXC
|
||||||
|
|
|
@ -21,7 +21,7 @@ const Rlottie = (async () => {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {string} text
|
* @param {string} text
|
||||||
* @returns {Promise<NodeJS.ReadableStream>}
|
* @returns {Promise<stream.Readable>}
|
||||||
*/
|
*/
|
||||||
async function convert(text) {
|
async function convert(text) {
|
||||||
const r = await Rlottie
|
const r = await Rlottie
|
||||||
|
@ -41,6 +41,7 @@ async function convert(text) {
|
||||||
png.data = Buffer.from(rendered)
|
png.data = Buffer.from(rendered)
|
||||||
// png.pack() is a bad stream and will throw away any data it sends if it's not connected to a destination straight away.
|
// png.pack() is a bad stream and will throw away any data it sends if it's not connected to a destination straight away.
|
||||||
// We use Duplex.from to convert it into a good stream.
|
// We use Duplex.from to convert it into a good stream.
|
||||||
|
// @ts-ignore
|
||||||
return stream.Duplex.from(png.pack())
|
return stream.Duplex.from(png.pack())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
const assert = require("assert").strict
|
const assert = require("assert").strict
|
||||||
const DiscordTypes = require("discord-api-types/v10")
|
const DiscordTypes = require("discord-api-types/v10")
|
||||||
const {Readable} = require("stream")
|
const stream = require("stream")
|
||||||
const passthrough = require("../../passthrough")
|
const passthrough = require("../../passthrough")
|
||||||
const {discord, db, select} = passthrough
|
const {discord, db, select} = passthrough
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ async function withWebhook(channelID, callback) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {string} channelID
|
* @param {string} channelID
|
||||||
* @param {DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | Readable}[]}} data
|
* @param {DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | stream.Readable}[]}} data
|
||||||
* @param {string} [threadID]
|
* @param {string} [threadID]
|
||||||
*/
|
*/
|
||||||
async function sendMessageWithWebhook(channelID, data, threadID) {
|
async function sendMessageWithWebhook(channelID, data, threadID) {
|
||||||
|
@ -70,7 +70,7 @@ async function sendMessageWithWebhook(channelID, data, threadID) {
|
||||||
/**
|
/**
|
||||||
* @param {string} channelID
|
* @param {string} channelID
|
||||||
* @param {string} messageID
|
* @param {string} messageID
|
||||||
* @param {DiscordTypes.RESTPatchAPIWebhookWithTokenMessageJSONBody & {files?: {name: string, file: Buffer | Readable}[]}} data
|
* @param {DiscordTypes.RESTPatchAPIWebhookWithTokenMessageJSONBody & {files?: {name: string, file: Buffer | stream.Readable}[]}} data
|
||||||
* @param {string} [threadID]
|
* @param {string} [threadID]
|
||||||
*/
|
*/
|
||||||
async function editMessageWithWebhook(channelID, messageID, data, threadID) {
|
async function editMessageWithWebhook(channelID, messageID, data, threadID) {
|
||||||
|
|
|
@ -1,8 +1,7 @@
|
||||||
// @ts-check
|
// @ts-check
|
||||||
|
|
||||||
const {Readable} = require("stream")
|
const stream = require("stream")
|
||||||
const {sync} = require("../../passthrough")
|
const {sync} = require("../../passthrough")
|
||||||
const assert = require("assert").strict
|
|
||||||
|
|
||||||
/** @type {import("../converters/emoji-sheet")} */
|
/** @type {import("../converters/emoji-sheet")} */
|
||||||
const emojiSheetConverter = sync.require("../converters/emoji-sheet")
|
const emojiSheetConverter = sync.require("../converters/emoji-sheet")
|
||||||
|
@ -20,8 +19,7 @@ async function getAndConvertEmoji(mxc) {
|
||||||
// If we were using connection pooling, we would be forced to download the entire GIF.
|
// If we were using connection pooling, we would be forced to download the entire GIF.
|
||||||
// So we set no agent to ensure we are not connection pooling.
|
// So we set no agent to ensure we are not connection pooling.
|
||||||
const res = await api.getMedia(mxc, {signal: abortController.signal})
|
const res = await api.getMedia(mxc, {signal: abortController.signal})
|
||||||
// @ts-ignore
|
const readable = stream.Readable.fromWeb(res.body)
|
||||||
const readable = Readable.fromWeb(res.body)
|
|
||||||
return emojiSheetConverter.convertImageStream(readable, () => {
|
return emojiSheetConverter.convertImageStream(readable, () => {
|
||||||
abortController.abort()
|
abortController.abort()
|
||||||
readable.emit("end")
|
readable.emit("end")
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
const Ty = require("../../types")
|
const Ty = require("../../types")
|
||||||
const DiscordTypes = require("discord-api-types/v10")
|
const DiscordTypes = require("discord-api-types/v10")
|
||||||
const {Readable} = require("stream")
|
const stream = require("stream")
|
||||||
const assert = require("assert").strict
|
const assert = require("assert").strict
|
||||||
const crypto = require("crypto")
|
const crypto = require("crypto")
|
||||||
const passthrough = require("../../passthrough")
|
const passthrough = require("../../passthrough")
|
||||||
|
@ -22,8 +22,8 @@ const editMessage = sync.require("../../d2m/actions/edit-message")
|
||||||
const emojiSheet = sync.require("../actions/emoji-sheet")
|
const emojiSheet = sync.require("../actions/emoji-sheet")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | Readable}[], pendingFiles?: ({name: string, mxc: string} | {name: string, mxc: string, key: string, iv: string} | {name: string, buffer: Buffer | Readable})[]}} message
|
* @param {DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | stream.Readable}[], pendingFiles?: ({name: string, mxc: string} | {name: string, mxc: string, key: string, iv: string} | {name: string, buffer: Buffer | stream.Readable})[]}} message
|
||||||
* @returns {Promise<DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | Readable}[]}>}
|
* @returns {Promise<DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | stream.Readable}[]}>}
|
||||||
*/
|
*/
|
||||||
async function resolvePendingFiles(message) {
|
async function resolvePendingFiles(message) {
|
||||||
if (!message.pendingFiles) return message
|
if (!message.pendingFiles) return message
|
||||||
|
@ -37,15 +37,14 @@ async function resolvePendingFiles(message) {
|
||||||
if ("key" in p) {
|
if ("key" in p) {
|
||||||
// Encrypted file
|
// Encrypted file
|
||||||
const d = crypto.createDecipheriv("aes-256-ctr", Buffer.from(p.key, "base64url"), Buffer.from(p.iv, "base64url"))
|
const d = crypto.createDecipheriv("aes-256-ctr", Buffer.from(p.key, "base64url"), Buffer.from(p.iv, "base64url"))
|
||||||
await api.getMedia(p.mxc).then(res => Readable.fromWeb(res.body).pipe(d))
|
await api.getMedia(p.mxc).then(res => stream.Readable.fromWeb(res.body).pipe(d))
|
||||||
return {
|
return {
|
||||||
name: p.name,
|
name: p.name,
|
||||||
file: d
|
file: d
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Unencrypted file
|
// Unencrypted file
|
||||||
/** @type {Readable} */
|
const body = await api.getMedia(p.mxc).then(res => stream.Readable.fromWeb(res.body))
|
||||||
const body = await api.getMedia(p.mxc).then(res => Readable.fromWeb(res.body))
|
|
||||||
return {
|
return {
|
||||||
name: p.name,
|
name: p.name,
|
||||||
file: body
|
file: body
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
// @ts-check
|
// @ts-check
|
||||||
|
|
||||||
const assert = require("assert").strict
|
const assert = require("assert").strict
|
||||||
|
const stream = require("stream")
|
||||||
const {pipeline} = require("stream").promises
|
const {pipeline} = require("stream").promises
|
||||||
const sharp = require("sharp")
|
const sharp = require("sharp")
|
||||||
const {GIFrame} = require("@cloudrac3r/giframe")
|
const {GIFrame} = require("@cloudrac3r/giframe")
|
||||||
|
@ -48,7 +49,7 @@ async function compositeMatrixEmojis(mxcs, mxcDownloader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {NodeJS.ReadableStream} streamIn
|
* @param {stream.Readable} streamIn
|
||||||
* @param {() => any} stopStream
|
* @param {() => any} stopStream
|
||||||
* @returns {Promise<Buffer | undefined>} Uncompressed PNG image
|
* @returns {Promise<Buffer | undefined>} Uncompressed PNG image
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
const Ty = require("../../types")
|
const Ty = require("../../types")
|
||||||
const DiscordTypes = require("discord-api-types/v10")
|
const DiscordTypes = require("discord-api-types/v10")
|
||||||
const {Readable} = require("stream")
|
const stream = require("stream")
|
||||||
const chunk = require("chunk-text")
|
const chunk = require("chunk-text")
|
||||||
const TurndownService = require("@cloudrac3r/turndown")
|
const TurndownService = require("@cloudrac3r/turndown")
|
||||||
const domino = require("domino")
|
const domino = require("domino")
|
||||||
|
@ -820,7 +820,7 @@ async function eventToMessage(event, guild, di) {
|
||||||
|
|
||||||
// Split into 2000 character chunks
|
// Split into 2000 character chunks
|
||||||
const chunks = chunk(content, 2000)
|
const chunks = chunk(content, 2000)
|
||||||
/** @type {(DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | Readable}[]})[]} */
|
/** @type {(DiscordTypes.RESTPostAPIWebhookWithTokenJSONBody & {files?: {name: string, file: Buffer | stream.Readable}[]})[]} */
|
||||||
const messages = chunks.map(content => ({
|
const messages = chunks.map(content => ({
|
||||||
content,
|
content,
|
||||||
allowed_mentions: {
|
allowed_mentions: {
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
const Ty = require("../types")
|
const Ty = require("../types")
|
||||||
const assert = require("assert").strict
|
const assert = require("assert").strict
|
||||||
|
const streamWeb = require("stream/web")
|
||||||
|
|
||||||
const passthrough = require("../passthrough")
|
const passthrough = require("../passthrough")
|
||||||
const {sync} = passthrough
|
const {sync} = passthrough
|
||||||
|
@ -343,7 +344,7 @@ async function ping() {
|
||||||
/**
|
/**
|
||||||
* @param {string} mxc
|
* @param {string} mxc
|
||||||
* @param {RequestInit} [init]
|
* @param {RequestInit} [init]
|
||||||
* @return {Promise<Response & {body: import("stream/web").ReadableStream<Uint8Array>}>}
|
* @return {Promise<Response & {body: streamWeb.ReadableStream<Uint8Array>}>}
|
||||||
*/
|
*/
|
||||||
async function getMedia(mxc, init = {}) {
|
async function getMedia(mxc, init = {}) {
|
||||||
const mediaParts = mxc?.match(/^mxc:\/\/([^/]+)\/(\w+)$/)
|
const mediaParts = mxc?.match(/^mxc:\/\/([^/]+)\/(\w+)$/)
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
const mixin = require("@cloudrac3r/mixin-deep")
|
const mixin = require("@cloudrac3r/mixin-deep")
|
||||||
const stream = require("stream")
|
const stream = require("stream")
|
||||||
const {ReadableStream} = require("stream/web")
|
const streamWeb = require("stream/web")
|
||||||
const getStream = require("get-stream")
|
const getStream = require("get-stream")
|
||||||
|
|
||||||
const {reg, writeRegistration} = require("./read-registration.js")
|
const {reg, writeRegistration} = require("./read-registration.js")
|
||||||
|
@ -22,7 +22,7 @@ class MatrixServerError extends Error {
|
||||||
/**
|
/**
|
||||||
* @param {string} method
|
* @param {string} method
|
||||||
* @param {string} url
|
* @param {string} url
|
||||||
* @param {string | object | ReadableStream | stream.Readable} [body]
|
* @param {string | object | streamWeb.ReadableStream | stream.Readable} [body]
|
||||||
* @param {any} [extra]
|
* @param {any} [extra]
|
||||||
*/
|
*/
|
||||||
async function mreq(method, url, body, extra = {}) {
|
async function mreq(method, url, body, extra = {}) {
|
||||||
|
@ -30,7 +30,7 @@ async function mreq(method, url, body, extra = {}) {
|
||||||
body = JSON.stringify(body)
|
body = JSON.stringify(body)
|
||||||
} else if (body instanceof stream.Readable && reg.ooye.content_length_workaround) {
|
} else if (body instanceof stream.Readable && reg.ooye.content_length_workaround) {
|
||||||
body = await getStream.buffer(body)
|
body = await getStream.buffer(body)
|
||||||
} else if (body instanceof ReadableStream && reg.ooye.content_length_workaround) {
|
} else if (body instanceof streamWeb.ReadableStream && reg.ooye.content_length_workaround) {
|
||||||
body = await stream.consumers.buffer(stream.Readable.fromWeb(body))
|
body = await stream.consumers.buffer(stream.Readable.fromWeb(body))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
// @ts-check
|
// @ts-check
|
||||||
|
|
||||||
|
const assert = require("assert")
|
||||||
const fs = require("fs")
|
const fs = require("fs")
|
||||||
const {join} = require("path")
|
const {join} = require("path")
|
||||||
const h3 = require("h3")
|
const h3 = require("h3")
|
||||||
|
@ -38,8 +39,8 @@ function compressResponse(event, response) {
|
||||||
if (!getRequestHeader(event, "accept-encoding")?.includes("gzip")) return
|
if (!getRequestHeader(event, "accept-encoding")?.includes("gzip")) return
|
||||||
/* c8 ignore next */
|
/* c8 ignore next */
|
||||||
if (typeof response.body !== "string") return
|
if (typeof response.body !== "string") return
|
||||||
/** @type {ReadableStream} */ // @ts-ignore
|
|
||||||
const stream = new Response(response.body).body
|
const stream = new Response(response.body).body
|
||||||
|
assert(stream)
|
||||||
setResponseHeader(event, "content-encoding", "gzip")
|
setResponseHeader(event, "content-encoding", "gzip")
|
||||||
response.body = stream.pipeThrough(new CompressionStream("gzip"))
|
response.body = stream.pipeThrough(new CompressionStream("gzip"))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
// @ts-check
|
// @ts-check
|
||||||
|
|
||||||
const {ReadableStream} = require("stream/web")
|
const streamWeb = require("stream/web")
|
||||||
const {test} = require("supertape")
|
const {test} = require("supertape")
|
||||||
const {router} = require("../../test/web")
|
const {router} = require("../../test/web")
|
||||||
const assert = require("assert").strict
|
const assert = require("assert").strict
|
||||||
|
@ -30,7 +30,7 @@ test("web server: compresses static resources", async t => {
|
||||||
"accept-encoding": "gzip"
|
"accept-encoding": "gzip"
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
assert(content instanceof ReadableStream)
|
assert(content instanceof streamWeb.ReadableStream)
|
||||||
const firstChunk = await content.getReader().read()
|
const firstChunk = await content.getReader().read()
|
||||||
t.ok(firstChunk.value instanceof Uint8Array, "can get data")
|
t.ok(firstChunk.value instanceof Uint8Array, "can get data")
|
||||||
t.deepEqual(firstChunk.value.slice(0, 3), Uint8Array.from([31, 139, 8]), "has compressed gzip header")
|
t.deepEqual(firstChunk.value.slice(0, 3), Uint8Array.from([31, 139, 8]), "has compressed gzip header")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue