Retrigger events that arrive too early
This commit is contained in:
parent
149cee845a
commit
a2f0bb79db
3 changed files with 88 additions and 19 deletions
61
d2m/actions/retrigger.js
Normal file
61
d2m/actions/retrigger.js
Normal file
|
@ -0,0 +1,61 @@
|
||||||
|
// @ts-check
|
||||||
|
|
||||||
|
const {EventEmitter} = require("events")
|
||||||
|
const passthrough = require("../../passthrough")
|
||||||
|
const {select} = passthrough
|
||||||
|
|
||||||
|
const DEBUG_RETRIGGER = false
|
||||||
|
|
||||||
|
function debugRetrigger(message) {
|
||||||
|
if (DEBUG_RETRIGGER) {
|
||||||
|
console.log(message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const emitter = new EventEmitter()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Due to Eventual Consistency(TM) an update/delete may arrive before the original message arrives
|
||||||
|
* (or before the it has finished being bridged to an event).
|
||||||
|
* In this case, wait until the original message has finished bridging, then retrigger the passed function.
|
||||||
|
* @template {(...args: any) => Promise<any>} T
|
||||||
|
* @param {string} messageID
|
||||||
|
* @param {T} fn
|
||||||
|
* @param {Parameters<T>} rest
|
||||||
|
* @returns {boolean} false if the event was found and the function will be ignored, true if the event was not found and the function will be retriggered
|
||||||
|
*/
|
||||||
|
function eventNotFoundThenRetrigger(messageID, fn, ...rest) {
|
||||||
|
const eventID = select("event_message", "event_id", {message_id: messageID}).pluck().get()
|
||||||
|
if (eventID) {
|
||||||
|
debugRetrigger(`[retrigger] OK mid <-> eid = ${messageID} <-> ${eventID}`)
|
||||||
|
return false // event was found so don't retrigger
|
||||||
|
}
|
||||||
|
|
||||||
|
debugRetrigger(`[retrigger] WAIT mid <-> eid = ${messageID} <-> ${eventID}`)
|
||||||
|
emitter.addListener(messageID, () => {
|
||||||
|
debugRetrigger(`[retrigger] TRIGGER mid = ${messageID}`)
|
||||||
|
fn(...rest)
|
||||||
|
})
|
||||||
|
// if the event never arrives, don't trigger the callback, just clean up
|
||||||
|
setTimeout(() => {
|
||||||
|
if (emitter.listeners(messageID).length) {
|
||||||
|
debugRetrigger(`[retrigger] EXPIRE mid = ${messageID}`)
|
||||||
|
}
|
||||||
|
emitter.removeAllListeners(messageID)
|
||||||
|
}, 60 * 1000) // 1 minute
|
||||||
|
return true // event was not found, then retrigger
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Triggers any pending operations that were waiting on the corresponding event ID.
|
||||||
|
* @param {string} messageID
|
||||||
|
*/
|
||||||
|
function messageFinishedBridging(messageID) {
|
||||||
|
if (emitter.listeners(messageID).length) {
|
||||||
|
debugRetrigger(`[retrigger] EMIT mid = ${messageID}`)
|
||||||
|
}
|
||||||
|
emitter.emit(messageID)
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports.eventNotFoundThenRetrigger = eventNotFoundThenRetrigger
|
||||||
|
module.exports.messageFinishedBridging = messageFinishedBridging
|
|
@ -30,7 +30,8 @@ function eventCanBeEdited(ev) {
|
||||||
* @param {import("../../matrix/api")} api simple-as-nails dependency injection for the matrix API
|
* @param {import("../../matrix/api")} api simple-as-nails dependency injection for the matrix API
|
||||||
*/
|
*/
|
||||||
async function editToChanges(message, guild, api) {
|
async function editToChanges(message, guild, api) {
|
||||||
// If it is a user edit, allow deleting old messages (e.g. they might have removed text from an image). If it is the system adding a generated embed to a message, don't delete old messages since the system only sends partial data.
|
// If it is a user edit, allow deleting old messages (e.g. they might have removed text from an image).
|
||||||
|
// If it is the system adding a generated embed to a message, don't delete old messages since the system only sends partial data.
|
||||||
|
|
||||||
const isGeneratedEmbed = !("content" in message)
|
const isGeneratedEmbed = !("content" in message)
|
||||||
|
|
||||||
|
@ -38,22 +39,21 @@ async function editToChanges(message, guild, api) {
|
||||||
|
|
||||||
const roomID = select("channel_room", "room_id", {channel_id: message.channel_id}).pluck().get()
|
const roomID = select("channel_room", "room_id", {channel_id: message.channel_id}).pluck().get()
|
||||||
assert(roomID)
|
assert(roomID)
|
||||||
|
const oldEventRows = select("event_message", ["event_id", "event_type", "event_subtype", "part", "reaction_part"], {message_id: message.id}).all()
|
||||||
|
|
||||||
/** @type {string?} Null if we don't have a sender in the room, which will happen if it's a webhook's message. The bridge bot will do the edit instead. */
|
/** @type {string?} Null if we don't have a sender in the room, which will happen if it's a webhook's message. The bridge bot will do the edit instead. */
|
||||||
let senderMxid = null
|
let senderMxid = null
|
||||||
if (message.author) {
|
if (message.author) {
|
||||||
senderMxid = from("sim").join("sim_member", "mxid").where({user_id: message.author.id, room_id: roomID}).pluck("mxid").get() || null
|
senderMxid = from("sim").join("sim_member", "mxid").where({user_id: message.author.id, room_id: roomID}).pluck("mxid").get() || null
|
||||||
} else {
|
} else {
|
||||||
// Should be a system generated embed. We want the embed to be sent by the same user who sent the message, so that the messages get grouped in most clients.
|
// Should be a system generated embed. We want the embed to be sent by the same user who sent the message, so that the messages get grouped in most clients.
|
||||||
const eventID = select("event_message", "event_id", {message_id: message.id}).pluck().get()
|
const eventID = oldEventRows[0].event_id // a calling function should have already checked that there is at least one message to edit
|
||||||
assert(eventID) // this should have been checked earlier in a calling function
|
|
||||||
const event = await api.getEvent(roomID, eventID)
|
const event = await api.getEvent(roomID, eventID)
|
||||||
if (utils.eventSenderIsFromDiscord(event.sender)) {
|
if (utils.eventSenderIsFromDiscord(event.sender)) {
|
||||||
senderMxid = event.sender
|
senderMxid = event.sender
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const oldEventRows = select("event_message", ["event_id", "event_type", "event_subtype", "part", "reaction_part"], {message_id: message.id}).all()
|
|
||||||
|
|
||||||
// Figure out what we will be replacing them with
|
// Figure out what we will be replacing them with
|
||||||
|
|
||||||
const newFallbackContent = await messageToEvent.messageToEvent(message, guild, {includeEditFallbackStar: true}, {api})
|
const newFallbackContent = await messageToEvent.messageToEvent(message, guild, {includeEditFallbackStar: true}, {api})
|
||||||
|
|
|
@ -33,6 +33,8 @@ const discordCommandHandler = sync.require("../discord/discord-command-handler")
|
||||||
const mxUtils = require("../m2d/converters/utils")
|
const mxUtils = require("../m2d/converters/utils")
|
||||||
/** @type {import("./actions/speedbump")} */
|
/** @type {import("./actions/speedbump")} */
|
||||||
const speedbump = sync.require("./actions/speedbump")
|
const speedbump = sync.require("./actions/speedbump")
|
||||||
|
/** @type {import("./actions/retrigger")} */
|
||||||
|
const retrigger = sync.require("./actions/retrigger")
|
||||||
|
|
||||||
/** @type {any} */ // @ts-ignore bad types from semaphore
|
/** @type {any} */ // @ts-ignore bad types from semaphore
|
||||||
const Semaphore = require("@chriscdn/promise-semaphore")
|
const Semaphore = require("@chriscdn/promise-semaphore")
|
||||||
|
@ -252,6 +254,8 @@ module.exports = {
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
await sendMessage.sendMessage(message, channel, guild, row),
|
await sendMessage.sendMessage(message, channel, guild, row),
|
||||||
await discordCommandHandler.execute(message, channel, guild)
|
await discordCommandHandler.execute(message, channel, guild)
|
||||||
|
|
||||||
|
retrigger.messageFinishedBridging(message.id)
|
||||||
},
|
},
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -259,6 +263,14 @@ module.exports = {
|
||||||
* @param {DiscordTypes.GatewayMessageUpdateDispatchData} data
|
* @param {DiscordTypes.GatewayMessageUpdateDispatchData} data
|
||||||
*/
|
*/
|
||||||
async onMessageUpdate(client, data) {
|
async onMessageUpdate(client, data) {
|
||||||
|
// Based on looking at data they've sent me over the gateway, this is the best way to check for meaningful changes.
|
||||||
|
// If the message content is a string then it includes all interesting fields and is meaningful.
|
||||||
|
// Otherwise, if there are embeds, then the system generated URL preview embeds.
|
||||||
|
if (!(typeof data.content === "string" || "embeds" in data)) return
|
||||||
|
|
||||||
|
// Deal with Eventual Consistency(TM)
|
||||||
|
if (retrigger.eventNotFoundThenRetrigger(data.id, module.exports.onMessageUpdate, client, data)) return
|
||||||
|
|
||||||
if (data.webhook_id) {
|
if (data.webhook_id) {
|
||||||
const row = select("webhook", "webhook_id", {webhook_id: data.webhook_id}).pluck().get()
|
const row = select("webhook", "webhook_id", {webhook_id: data.webhook_id}).pluck().get()
|
||||||
if (row) return // The message was sent by the bridge's own webhook on discord. We don't want to reflect this back, so just drop it.
|
if (row) return // The message was sent by the bridge's own webhook on discord. We don't want to reflect this back, so just drop it.
|
||||||
|
@ -270,10 +282,6 @@ module.exports = {
|
||||||
const {affected, row} = await speedbump.maybeDoSpeedbump(data.channel_id, data.id)
|
const {affected, row} = await speedbump.maybeDoSpeedbump(data.channel_id, data.id)
|
||||||
if (affected) return
|
if (affected) return
|
||||||
|
|
||||||
// Based on looking at data they've sent me over the gateway, this is the best way to check for meaningful changes.
|
|
||||||
// If the message content is a string then it includes all interesting fields and is meaningful.
|
|
||||||
// Otherwise, if there are embeds, then the system generated URL preview embeds.
|
|
||||||
if (typeof data.content === "string" || "embeds" in data) {
|
|
||||||
/** @type {DiscordTypes.GatewayMessageCreateDispatchData} */
|
/** @type {DiscordTypes.GatewayMessageCreateDispatchData} */
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
const message = data
|
const message = data
|
||||||
|
@ -284,7 +292,6 @@ module.exports = {
|
||||||
assert(guild)
|
assert(guild)
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
await editMessage.editMessage(message, guild, row)
|
await editMessage.editMessage(message, guild, row)
|
||||||
}
|
|
||||||
},
|
},
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -311,6 +318,7 @@ module.exports = {
|
||||||
*/
|
*/
|
||||||
async onMessageDelete(client, data) {
|
async onMessageDelete(client, data) {
|
||||||
speedbump.onMessageDelete(data.id)
|
speedbump.onMessageDelete(data.id)
|
||||||
|
if (retrigger.eventNotFoundThenRetrigger(data.id, module.exports.onMessageDelete, client, data)) return
|
||||||
await deleteMessage.deleteMessage(data)
|
await deleteMessage.deleteMessage(data)
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue