diff --git a/src/d2m/actions/retrigger.js b/src/d2m/actions/retrigger.js index 2e241ec..66ef19e 100644 --- a/src/d2m/actions/retrigger.js +++ b/src/d2m/actions/retrigger.js @@ -2,17 +2,9 @@ const {EventEmitter} = require("events") const passthrough = require("../../passthrough") -const {select, sync, from} = passthrough -/** @type {import("../../matrix/utils")} */ -const utils = sync.require("../../matrix/utils") +const {select} = passthrough -/* - Due to Eventual Consistency(TM) an update/delete may arrive before the original message arrives - (or before the it has finished being bridged to an event). - In this case, wait until the original message has finished bridging, then retrigger the passed function. -*/ - -const DEBUG_RETRIGGER = true +const DEBUG_RETRIGGER = false function debugRetrigger(message) { if (DEBUG_RETRIGGER) { @@ -20,140 +12,81 @@ function debugRetrigger(message) { } } -const storage = new class { - /** @private @type {Set} */ - paused = new Set() - /** @private @type {Map any)[]>} id -> list of resolvers */ - resolves = new Map() - /** @private @type {Map>} id -> timer */ - timers = new Map() +const paused = new Set() +const emitter = new EventEmitter() - /** - * The purpose of storage is to store `resolve` and call it at a later time. - * @param {string} id - * @param {(found: Boolean) => any} resolve - */ - store(id, resolve) { - debugRetrigger(`[retrigger] STORE id = ${id}`) - this.resolves.set(id, (this.resolves.get(id) || []).concat(resolve)) // add to list in map value - if (!this.timers.has(id)) { - debugRetrigger(`[retrigger] SET TIMER id = ${id}`) - this.timers.set(id, setTimeout(() => this.resolve(id, false), 60 * 1000).unref()) // 1 minute - } - } - - /** @param {string} id */ - isNotPaused(id) { - return !storage.paused.has(id) - } - - /** @param {string} id */ - pause(id) { - debugRetrigger(`[retrigger] PAUSE id = ${id}`) - this.paused.add(id) - } - - /** - * Go through `resolves` storage and resolve them all. (Also resets timer/paused.) - * @param {string} id - * @param {boolean} value - */ - resolve(id, value) { - if (this.paused.has(id)) { - debugRetrigger(`[retrigger] RESUME id = ${id}`) - this.paused.delete(id) - } - - if (this.resolves.has(id)) { - debugRetrigger(`[retrigger] RESOLVE ${value} id = ${id}`) - const fns = this.resolves.get(id) || [] - this.resolves.delete(id) - for (const fn of fns) { - fn(value) +/** + * Due to Eventual Consistency(TM) an update/delete may arrive before the original message arrives + * (or before the it has finished being bridged to an event). + * In this case, wait until the original message has finished bridging, then retrigger the passed function. + * @template {(...args: any[]) => any} T + * @param {string} inputID + * @param {T} fn + * @param {Parameters} rest + * @returns {boolean} false if the event was found and the function will be ignored, true if the event was not found and the function will be retriggered + */ +function eventNotFoundThenRetrigger(inputID, fn, ...rest) { + if (!paused.has(inputID)) { + if (inputID.match(/^[0-9]+$/)) { + const eventID = select("event_message", "event_id", {message_id: inputID}).pluck().get() + if (eventID) { + debugRetrigger(`[retrigger] OK mid <-> eid = ${inputID} <-> ${eventID}`) + return false // event was found so don't retrigger + } + } else if (inputID.match(/^\$/)) { + const messageID = select("event_message", "message_id", {event_id: inputID}).pluck().get() + if (messageID) { + debugRetrigger(`[retrigger] OK eid <-> mid = ${inputID} <-> ${messageID}`) + return false // message was found so don't retrigger } } + } - if (this.timers.has(id)) { - clearTimeout(this.timers.get(id)) - this.timers.delete(id) + debugRetrigger(`[retrigger] WAIT id = ${inputID}`) + emitter.once(inputID, () => { + debugRetrigger(`[retrigger] TRIGGER id = ${inputID}`) + fn(...rest) + }) + // if the event never arrives, don't trigger the callback, just clean up + setTimeout(() => { + if (emitter.listeners(inputID).length) { + debugRetrigger(`[retrigger] EXPIRE id = ${inputID}`) } - } -} - -/** - * @param {string} id - * @param {(found: Boolean) => any} resolve - * @param {boolean} existsInDatabase - */ -function waitFor(id, resolve, existsInDatabase) { - if (existsInDatabase && storage.isNotPaused(id)) { // if event already exists and isn't paused then resolve immediately - debugRetrigger(`[retrigger] EXISTS id = ${id}`) - return resolve(true) - } - - // doesn't exist. wait for it to exist. storage will resolve true if it exists or false if it timed out - return storage.store(id, resolve) -} - -const GET_EVENT_PREPARED = from("event_message").select("event_id").and("WHERE event_id = ?").prepare().raw() -/** - * @param {string} eventID - * @returns {Promise} if true then the message did not arrive - */ -function waitForEvent(eventID) { - const {promise, resolve} = Promise.withResolvers() - waitFor(eventID, resolve, !!GET_EVENT_PREPARED.get(eventID)) - return promise -} - -const GET_MESSAGE_PREPARED = from("event_message").select("message_id").and("WHERE message_id = ?").prepare().raw() -/** - * @param {string} messageID - * @returns {Promise} if true then the message did not arrive - */ -function waitForMessage(messageID) { - const {promise, resolve} = Promise.withResolvers() - waitFor(messageID, resolve, !!GET_MESSAGE_PREPARED.get(messageID)) - return promise -} - -const GET_REACTION_EVENT_PREPARED = from("reaction").select("hashed_event_id").and("WHERE hashed_event_id = ?").prepare().raw() -/** - * @param {string} eventID - * @returns {Promise} if true then the message did not arrive - */ -function waitForReactionEvent(eventID) { - const {promise, resolve} = Promise.withResolvers() - waitFor(eventID, resolve, !!GET_REACTION_EVENT_PREPARED.get(utils.getEventIDHash(eventID))) - return promise + emitter.removeAllListeners(inputID) + }, 60 * 1000) // 1 minute + return true // event was not found, then retrigger } /** * Anything calling retrigger during the callback will be paused and retriggered after the callback resolves. * @template T - * @param {string} id + * @param {string} messageID * @param {Promise} promise * @returns {Promise} */ -async function pauseChanges(id, promise) { +async function pauseChanges(messageID, promise) { try { - storage.pause(id) + debugRetrigger(`[retrigger] PAUSE id = ${messageID}`) + paused.add(messageID) return await promise } finally { - finishedBridging(id) + debugRetrigger(`[retrigger] RESUME id = ${messageID}`) + paused.delete(messageID) + messageFinishedBridging(messageID) } } /** * Triggers any pending operations that were waiting on the corresponding event ID. - * @param {string} id + * @param {string} messageID */ -function finishedBridging(id) { - storage.resolve(id, true) +function messageFinishedBridging(messageID) { + if (emitter.listeners(messageID).length) { + debugRetrigger(`[retrigger] EMIT id = ${messageID}`) + } + emitter.emit(messageID) } -module.exports.waitForMessage = waitForMessage -module.exports.waitForEvent = waitForEvent -module.exports.waitForReactionEvent = waitForReactionEvent +module.exports.eventNotFoundThenRetrigger = eventNotFoundThenRetrigger +module.exports.messageFinishedBridging = messageFinishedBridging module.exports.pauseChanges = pauseChanges -module.exports.finishedBridging = finishedBridging \ No newline at end of file diff --git a/src/d2m/converters/remove-reaction.js b/src/d2m/converters/remove-reaction.js index b6b0407..4ca22b6 100644 --- a/src/d2m/converters/remove-reaction.js +++ b/src/d2m/converters/remove-reaction.js @@ -34,7 +34,7 @@ function removeReaction(data, reactions, key) { // Even though the bridge bot only reacted once on Discord-side, multiple Matrix users may have // reacted on Matrix-side. Semantically, we want to remove the reaction from EVERY Matrix user. // Also need to clean up the database. - const hash = utils.getEventIDHash(eventID) + const hash = utils.getEventIDHash(event.event_id) removals.push({eventID, mxid: null, hash}) } if (!lookingAtMatrixReaction && !wantToRemoveMatrixReaction) { diff --git a/src/d2m/event-dispatcher.js b/src/d2m/event-dispatcher.js index 4a16097..b6593ec 100644 --- a/src/d2m/event-dispatcher.js +++ b/src/d2m/event-dispatcher.js @@ -2,7 +2,6 @@ const assert = require("assert").strict const DiscordTypes = require("discord-api-types/v10") -const {id: botID} = require("../../addbot") const {sync, db, select, from} = require("../passthrough") /** @type {import("./actions/send-message")}) */ @@ -39,8 +38,6 @@ const removeMember = sync.require("./actions/remove-member") const vote = sync.require("./actions/poll-vote") /** @type {import("../m2d/event-dispatcher")} */ const matrixEventDispatcher = sync.require("../m2d/event-dispatcher") -/** @type {import("../m2d/actions/redact.js")} */ -const redact = sync.require("../m2d/actions/redact.js") /** @type {import("../discord/interactions/matrix-info")} */ const matrixInfoInteraction = sync.require("../discord/interactions/matrix-info") /** @type {import("../agi/listener")} */ @@ -324,7 +321,7 @@ module.exports = { // @ts-ignore await sendMessage.sendMessage(message, channel, guild, row) - retrigger.finishedBridging(message.id) + retrigger.messageFinishedBridging(message.id) }, /** @@ -345,7 +342,7 @@ module.exports = { if (!row) { // Check that the sending-to room exists, and deal with Eventual Consistency(TM) - if (!await retrigger.waitForMessage(data.id)) return + if (retrigger.eventNotFoundThenRetrigger(data.id, module.exports.MESSAGE_UPDATE, client, data)) return } /** @type {DiscordTypes.GatewayMessageCreateDispatchData} */ @@ -383,16 +380,6 @@ module.exports = { * @param {DiscordTypes.GatewayMessageReactionRemoveDispatchData | DiscordTypes.GatewayMessageReactionRemoveEmojiDispatchData | DiscordTypes.GatewayMessageReactionRemoveAllDispatchData} data */ async onSomeReactionsRemoved(client, data) { - // Don't attempt to double-bridge our own deleted reactions, this would go badly if there are race conditions - if ("user_id" in data && data.user_id === botID && data.emoji.name) { // sure hope data.emoji.name exists here - const encodedEmoji = encodeURIComponent(data.emoji.id ? `${data.emoji.name}:${data.emoji.id}` : data.emoji.name) - const i = redact.ourDeletedReactions.findIndex(x => data.message_id === x.message_id && encodedEmoji === x.encoded_emoji) - if (i !== -1) { - redact.ourDeletedReactions.splice(i, 1) - return - } - } - await removeReaction.removeSomeReactions(data) }, @@ -402,7 +389,7 @@ module.exports = { */ async MESSAGE_DELETE(client, data) { speedbump.onMessageDelete(data.id) - if (!await retrigger.waitForMessage(data.id)) return + if (retrigger.eventNotFoundThenRetrigger(data.id, module.exports.MESSAGE_DELETE, client, data)) return await deleteMessage.deleteMessage(data) }, @@ -450,12 +437,12 @@ module.exports = { * @param {DiscordTypes.GatewayMessagePollVoteDispatchData} data */ async MESSAGE_POLL_VOTE_ADD(client, data) { - if (!await retrigger.waitForMessage(data.message_id)) return + if (retrigger.eventNotFoundThenRetrigger(data.message_id, module.exports.MESSAGE_POLL_VOTE_ADD, client, data)) return await vote.addVote(data) }, async MESSAGE_POLL_VOTE_REMOVE(client, data) { - if (!await retrigger.waitForMessage(data.message_id)) return + if (retrigger.eventNotFoundThenRetrigger(data.message_id, module.exports.MESSAGE_POLL_VOTE_REMOVE, client, data)) return await vote.removeVote(data) }, diff --git a/src/db/orm.js b/src/db/orm.js index 8763314..4d9b6f1 100644 --- a/src/db/orm.js +++ b/src/db/orm.js @@ -104,16 +104,6 @@ class From { return r } - pluckUnsafe(col) { - /** @type {Pluck} */ - // @ts-ignore - const r = this - r.cols = [col] - r.makeColsSafe = false - r.isPluck = true - return r - } - /** * @param {string} sql */ diff --git a/src/db/orm.test.js b/src/db/orm.test.js index 4639090..6f6018e 100644 --- a/src/db/orm.test.js +++ b/src/db/orm.test.js @@ -68,8 +68,3 @@ test("orm: select unsafe works (to select complex column names that can't be typ .all() t.equal(results[0].power_level, 150) }) - -test("orm: pluck unsafe works (to select complex column names that can't be type verified)", t => { - const result = from("channel_room").where({guild_id: "112760669178241024"}).pluckUnsafe("count(*)").get() - t.equal(result, 7) -}) diff --git a/src/m2d/actions/add-reaction.js b/src/m2d/actions/add-reaction.js index c453244..e4981fb 100644 --- a/src/m2d/actions/add-reaction.js +++ b/src/m2d/actions/add-reaction.js @@ -17,7 +17,7 @@ const retrigger = sync.require("../../d2m/actions/retrigger") */ async function addReaction(event) { // Wait until the corresponding channel and message have already been bridged - if (!await retrigger.waitForEvent(event.content["m.relates_to"].event_id)) return + if (retrigger.eventNotFoundThenRetrigger(event.content["m.relates_to"].event_id, () => as.emit("type:m.reaction", event))) return // These will exist because it passed retrigger const row = from("event_message").join("message_room", "message_id").join("historical_channel_room", "historical_room_index") @@ -50,8 +50,6 @@ async function addReaction(event) { } db.prepare("REPLACE INTO reaction (hashed_event_id, message_id, encoded_emoji, original_encoding) VALUES (?, ?, ?, ?)").run(utils.getEventIDHash(event.event_id), messageID, discordPreferredEncoding, key) - - retrigger.finishedBridging(event.event_id) } module.exports.addReaction = addReaction diff --git a/src/m2d/actions/redact.js b/src/m2d/actions/redact.js index 3a174f5..3135d31 100644 --- a/src/m2d/actions/redact.js +++ b/src/m2d/actions/redact.js @@ -10,9 +10,6 @@ const utils = sync.require("../../matrix/utils") /** @type {import("../../d2m/actions/retrigger")} */ const retrigger = sync.require("../../d2m/actions/retrigger") -/** @type {{message_id: string, encoded_emoji: string}[]} */ -const ourDeletedReactions = [] - /** * @param {Ty.Event.Outer_M_Room_Redaction} event */ @@ -27,21 +24,6 @@ async function deleteMessage(event) { db.prepare("DELETE FROM message_room WHERE message_id = ?").run(rows[0].message_id) } -/** - * @param {Ty.Event.Outer_M_Room_Redaction} event - */ -async function removeMessageEvent(event) { - // Could be for removing a message or suppressing embeds. For more information, the message needs to be bridged first. - if (!await retrigger.waitForEvent(event.redacts)) return - - const row = select("event_message", ["event_type", "event_subtype", "part"], {event_id: event.redacts}).get() - if (row && row.event_type === "m.room.message" && row.event_subtype === "m.notice" && row.part === 1) { - await suppressEmbeds(event) - } else { - await deleteMessage(event) - } -} - /** * @param {Ty.Event.Outer_M_Room_Redaction} event */ @@ -59,18 +41,11 @@ async function suppressEmbeds(event) { * @param {Ty.Event.Outer_M_Room_Redaction} event */ async function removeReaction(event) { - if (!await retrigger.waitForReactionEvent(event.redacts)) return - const hash = utils.getEventIDHash(event.redacts) const row = from("reaction").join("message_room", "message_id").join("historical_channel_room", "historical_room_index") .select("reference_channel_id", "message_id", "encoded_emoji").where({hashed_event_id: hash}).get() if (!row) return - // See how many Matrix-side reactions there are, and delete if it's the last one - const numberOfReactions = from("reaction").where({message_id: row.message_id, encoded_emoji: row.encoded_emoji}).pluckUnsafe("count(*)").get() - if (numberOfReactions === 1) { - ourDeletedReactions.push(row) - await discord.snow.channel.deleteReactionSelf(row.reference_channel_id, row.message_id, row.encoded_emoji) - } + await discord.snow.channel.deleteReactionSelf(row.reference_channel_id, row.message_id, row.encoded_emoji) db.prepare("DELETE FROM reaction WHERE hashed_event_id = ?").run(hash) } @@ -79,12 +54,18 @@ async function removeReaction(event) { * @param {Ty.Event.Outer_M_Room_Redaction} event */ async function handle(event) { - // Don't know if it's a redaction for a reaction or an event, try both at the same time (otherwise waitFor will block) - await Promise.all([ - removeMessageEvent(event), - removeReaction(event) - ]) + // If this is for removing a reaction, try it + await removeReaction(event) + + // Or, it might be for removing a message or suppressing embeds. But to do that, the message needs to be bridged first. + if (retrigger.eventNotFoundThenRetrigger(event.redacts, () => as.emit("type:m.room.redaction", event))) return + + const row = select("event_message", ["event_type", "event_subtype", "part"], {event_id: event.redacts}).get() + if (row && row.event_type === "m.room.message" && row.event_subtype === "m.notice" && row.part === 1) { + await suppressEmbeds(event) + } else { + await deleteMessage(event) + } } module.exports.handle = handle -module.exports.ourDeletedReactions = ourDeletedReactions \ No newline at end of file diff --git a/src/m2d/event-dispatcher.js b/src/m2d/event-dispatcher.js index 3d1e090..c11b696 100644 --- a/src/m2d/event-dispatcher.js +++ b/src/m2d/event-dispatcher.js @@ -211,7 +211,7 @@ async event => { // @ts-ignore await matrixCommandHandler.execute(event) } - retrigger.finishedBridging(event.event_id) + retrigger.messageFinishedBridging(event.event_id) await api.ackEvent(event) })) @@ -222,7 +222,7 @@ sync.addTemporaryListener(as, "type:m.sticker", guard("m.sticker", async event => { if (utils.eventSenderIsFromDiscord(event.sender)) return const messageResponses = await sendEvent.sendEvent(event) - retrigger.finishedBridging(event.event_id) + retrigger.messageFinishedBridging(event.event_id) await api.ackEvent(event) })) diff --git a/src/stdin.js b/src/stdin.js index 43f9607..2548d42 100644 --- a/src/stdin.js +++ b/src/stdin.js @@ -15,15 +15,12 @@ const mreq = sync.require("./matrix/mreq") const api = sync.require("./matrix/api") const file = sync.require("./matrix/file") const sendEvent = sync.require("./m2d/actions/send-event") -const redact = sync.require("./m2d/actions/redact") const eventDispatcher = sync.require("./d2m/event-dispatcher") const updatePins = sync.require("./d2m/actions/update-pins") const speedbump = sync.require("./d2m/actions/speedbump") const ks = sync.require("./matrix/kstate") const setPresence = sync.require("./d2m/actions/set-presence") const channelWebhook = sync.require("./m2d/actions/channel-webhook") -const dUtils = sync.require("./discord/utils") -const mxUtils = sync.require("./matrix/utils") const guildID = "112760669178241024" async function ping() {