From a2f0bb79db93d89ecef07427f84cac40c2594e96 Mon Sep 17 00:00:00 2001 From: Cadence Ember Date: Tue, 16 Jul 2024 23:48:03 +1200 Subject: [PATCH] Retrigger events that arrive too early --- d2m/actions/retrigger.js | 61 +++++++++++++++++++++++++++++++ d2m/converters/edit-to-changes.js | 10 ++--- d2m/event-dispatcher.js | 36 +++++++++++------- 3 files changed, 88 insertions(+), 19 deletions(-) create mode 100644 d2m/actions/retrigger.js diff --git a/d2m/actions/retrigger.js b/d2m/actions/retrigger.js new file mode 100644 index 00000000..2475daf5 --- /dev/null +++ b/d2m/actions/retrigger.js @@ -0,0 +1,61 @@ +// @ts-check + +const {EventEmitter} = require("events") +const passthrough = require("../../passthrough") +const {select} = passthrough + +const DEBUG_RETRIGGER = false + +function debugRetrigger(message) { + if (DEBUG_RETRIGGER) { + console.log(message) + } +} + +const emitter = new EventEmitter() + +/** + * Due to Eventual Consistency(TM) an update/delete may arrive before the original message arrives + * (or before the it has finished being bridged to an event). + * In this case, wait until the original message has finished bridging, then retrigger the passed function. + * @template {(...args: any) => Promise} T + * @param {string} messageID + * @param {T} fn + * @param {Parameters} rest + * @returns {boolean} false if the event was found and the function will be ignored, true if the event was not found and the function will be retriggered + */ +function eventNotFoundThenRetrigger(messageID, fn, ...rest) { + const eventID = select("event_message", "event_id", {message_id: messageID}).pluck().get() + if (eventID) { + debugRetrigger(`[retrigger] OK mid <-> eid = ${messageID} <-> ${eventID}`) + return false // event was found so don't retrigger + } + + debugRetrigger(`[retrigger] WAIT mid <-> eid = ${messageID} <-> ${eventID}`) + emitter.addListener(messageID, () => { + debugRetrigger(`[retrigger] TRIGGER mid = ${messageID}`) + fn(...rest) + }) + // if the event never arrives, don't trigger the callback, just clean up + setTimeout(() => { + if (emitter.listeners(messageID).length) { + debugRetrigger(`[retrigger] EXPIRE mid = ${messageID}`) + } + emitter.removeAllListeners(messageID) + }, 60 * 1000) // 1 minute + return true // event was not found, then retrigger +} + +/** + * Triggers any pending operations that were waiting on the corresponding event ID. + * @param {string} messageID + */ +function messageFinishedBridging(messageID) { + if (emitter.listeners(messageID).length) { + debugRetrigger(`[retrigger] EMIT mid = ${messageID}`) + } + emitter.emit(messageID) +} + +module.exports.eventNotFoundThenRetrigger = eventNotFoundThenRetrigger +module.exports.messageFinishedBridging = messageFinishedBridging diff --git a/d2m/converters/edit-to-changes.js b/d2m/converters/edit-to-changes.js index dd8e39f8..c80bb497 100644 --- a/d2m/converters/edit-to-changes.js +++ b/d2m/converters/edit-to-changes.js @@ -30,7 +30,8 @@ function eventCanBeEdited(ev) { * @param {import("../../matrix/api")} api simple-as-nails dependency injection for the matrix API */ async function editToChanges(message, guild, api) { - // If it is a user edit, allow deleting old messages (e.g. they might have removed text from an image). If it is the system adding a generated embed to a message, don't delete old messages since the system only sends partial data. + // If it is a user edit, allow deleting old messages (e.g. they might have removed text from an image). + // If it is the system adding a generated embed to a message, don't delete old messages since the system only sends partial data. const isGeneratedEmbed = !("content" in message) @@ -38,22 +39,21 @@ async function editToChanges(message, guild, api) { const roomID = select("channel_room", "room_id", {channel_id: message.channel_id}).pluck().get() assert(roomID) + const oldEventRows = select("event_message", ["event_id", "event_type", "event_subtype", "part", "reaction_part"], {message_id: message.id}).all() + /** @type {string?} Null if we don't have a sender in the room, which will happen if it's a webhook's message. The bridge bot will do the edit instead. */ let senderMxid = null if (message.author) { senderMxid = from("sim").join("sim_member", "mxid").where({user_id: message.author.id, room_id: roomID}).pluck("mxid").get() || null } else { // Should be a system generated embed. We want the embed to be sent by the same user who sent the message, so that the messages get grouped in most clients. - const eventID = select("event_message", "event_id", {message_id: message.id}).pluck().get() - assert(eventID) // this should have been checked earlier in a calling function + const eventID = oldEventRows[0].event_id // a calling function should have already checked that there is at least one message to edit const event = await api.getEvent(roomID, eventID) if (utils.eventSenderIsFromDiscord(event.sender)) { senderMxid = event.sender } } - const oldEventRows = select("event_message", ["event_id", "event_type", "event_subtype", "part", "reaction_part"], {message_id: message.id}).all() - // Figure out what we will be replacing them with const newFallbackContent = await messageToEvent.messageToEvent(message, guild, {includeEditFallbackStar: true}, {api}) diff --git a/d2m/event-dispatcher.js b/d2m/event-dispatcher.js index ec047500..7f27b77e 100644 --- a/d2m/event-dispatcher.js +++ b/d2m/event-dispatcher.js @@ -33,6 +33,8 @@ const discordCommandHandler = sync.require("../discord/discord-command-handler") const mxUtils = require("../m2d/converters/utils") /** @type {import("./actions/speedbump")} */ const speedbump = sync.require("./actions/speedbump") +/** @type {import("./actions/retrigger")} */ +const retrigger = sync.require("./actions/retrigger") /** @type {any} */ // @ts-ignore bad types from semaphore const Semaphore = require("@chriscdn/promise-semaphore") @@ -252,6 +254,8 @@ module.exports = { // @ts-ignore await sendMessage.sendMessage(message, channel, guild, row), await discordCommandHandler.execute(message, channel, guild) + + retrigger.messageFinishedBridging(message.id) }, /** @@ -259,6 +263,14 @@ module.exports = { * @param {DiscordTypes.GatewayMessageUpdateDispatchData} data */ async onMessageUpdate(client, data) { + // Based on looking at data they've sent me over the gateway, this is the best way to check for meaningful changes. + // If the message content is a string then it includes all interesting fields and is meaningful. + // Otherwise, if there are embeds, then the system generated URL preview embeds. + if (!(typeof data.content === "string" || "embeds" in data)) return + + // Deal with Eventual Consistency(TM) + if (retrigger.eventNotFoundThenRetrigger(data.id, module.exports.onMessageUpdate, client, data)) return + if (data.webhook_id) { const row = select("webhook", "webhook_id", {webhook_id: data.webhook_id}).pluck().get() if (row) return // The message was sent by the bridge's own webhook on discord. We don't want to reflect this back, so just drop it. @@ -270,21 +282,16 @@ module.exports = { const {affected, row} = await speedbump.maybeDoSpeedbump(data.channel_id, data.id) if (affected) return - // Based on looking at data they've sent me over the gateway, this is the best way to check for meaningful changes. - // If the message content is a string then it includes all interesting fields and is meaningful. - // Otherwise, if there are embeds, then the system generated URL preview embeds. - if (typeof data.content === "string" || "embeds" in data) { - /** @type {DiscordTypes.GatewayMessageCreateDispatchData} */ - // @ts-ignore - const message = data + /** @type {DiscordTypes.GatewayMessageCreateDispatchData} */ + // @ts-ignore + const message = data - const channel = client.channels.get(message.channel_id) - if (!channel || !("guild_id" in channel) || !channel.guild_id) return // Nothing we can do in direct messages. - const guild = client.guilds.get(channel.guild_id) - assert(guild) - // @ts-ignore - await editMessage.editMessage(message, guild, row) - } + const channel = client.channels.get(message.channel_id) + if (!channel || !("guild_id" in channel) || !channel.guild_id) return // Nothing we can do in direct messages. + const guild = client.guilds.get(channel.guild_id) + assert(guild) + // @ts-ignore + await editMessage.editMessage(message, guild, row) }, /** @@ -311,6 +318,7 @@ module.exports = { */ async onMessageDelete(client, data) { speedbump.onMessageDelete(data.id) + if (retrigger.eventNotFoundThenRetrigger(data.id, module.exports.onMessageDelete, client, data)) return await deleteMessage.deleteMessage(data) },