Don't delete our reaction on Discord unless we have 0 of that reaction coming from Matrix #85

Open
ellie wants to merge 8 commits from ellie/out-of-your-element:ellie-fix-reacts into main
9 changed files with 312 additions and 98 deletions
Showing only changes of commit a0f8b02c55 - Show all commits

View file

@ -33,6 +33,7 @@ async function removeSomeReactions(data) {
( "user_id" in data ? removeReaction(data, reactions)
: "emoji" in data ? removeEmojiReaction(data, reactions)
: removeAllReactions(data, reactions))
console.log("ELLIE TEST: data:", data, "removals:", removals)
// Redact the events and delete individual stored events in the database
for (const removal of removals) {

View file

@ -0,0 +1,143 @@
// we are going to delete this file before committing
// just here so it's easy to see what our prior code was while programming the new version.
// @ts-check
const {EventEmitter} = require("events")
const passthrough = require("../../passthrough")
const {select, sync} = passthrough
/** @type {import("../../matrix/utils")} */
const utils = sync.require("../../matrix/utils")
/*
Due to Eventual Consistency(TM) an update/delete may arrive before the original message arrives
(or before the it has finished being bridged to an event).
In this case, wait until the original message has finished bridging, then retrigger the passed function.
*/
const DEBUG_RETRIGGER = false
function debugRetrigger(message) {
if (DEBUG_RETRIGGER) {
console.log(message)
}
}
const paused = new Set()
const emitter = new EventEmitter()
/**
* @template {(...args: any[]) => any} T
* @param {string} eventID
* @param {T} fn
* @param {Parameters<T>} rest
* @returns {boolean} false if the event was found and the function will be ignored, true if the event was not found and the function will be retriggered
*/
function eventNotFoundThenRetrigger(eventID, fn, ...rest) {
if (!paused.has(eventID)) {
const messageID = select("event_message", "message_id", {event_id: eventID}).pluck().get()
if (messageID) {
debugRetrigger(`[retrigger] OK eid <-> mid = ${eventID} <-> ${messageID}`)
return false // message was found so don't retrigger
}
}
waitThenRetrigger(eventID, fn, ...rest)
return true
}
/**
* @template {(...args: any[]) => any} T
* @param {string} messageID
* @param {T} fn
* @param {Parameters<T>} rest
* @returns {boolean} false if the event was found and the function will be ignored, true if the event was not found and the function will be retriggered
*/
function messageNotFoundThenRetrigger(messageID, fn, ...rest) {
if (!paused.has(messageID)) {
const eventID = select("event_message", "event_id", {message_id: messageID}).pluck().get()
if (eventID) {
debugRetrigger(`[retrigger] OK mid <-> eid = ${messageID} <-> ${eventID}`)
return false // event was found so don't retrigger
}
}
waitThenRetrigger(messageID, fn, ...rest)
return true
}
/**
* @template {(...args: any[]) => any} T
* @param {string} reactionEventID
* @param {T} fn
* @param {Parameters<T>} rest
* @returns {boolean} false if the event was found and the function will be ignored, true if the event was not found and the function will be retriggered
*/
function reactionNotFoundThenRetrigger(reactionEventID, fn, ...rest){
const reactionEventHash = utils.getEventIDHash(reactionEventID)
const reaction = select("reaction", "encoded_emoji", {hashed_event_id: reactionEventHash})
if (reaction) {
debugRetrigger(`[retrigger] OK eid <-> reaction = ${reactionEventID} <-> ${reactionEventHash}`)
return false
}
waitThenRetrigger(reactionEventID, fn, ...rest)
return true
}
/**
* @template {(...args: any[]) => any} T
* @param {string} id
* @param {T} fn
* @param {Parameters<T>} rest
*/
function waitThenRetrigger(id, fn, ...rest){
debugRetrigger(`[retrigger] WAIT id = ${id}`)
emitter.once(id, () => {
debugRetrigger(`[retrigger] TRIGGER id = ${id}`)
fn(...rest)
})
// if the event never arrives, don't trigger the callback, just clean up
setTimeout(() => {
if (emitter.listeners(id).length) {
debugRetrigger(`[retrigger] EXPIRE id = ${id}`)
}
emitter.removeAllListeners(id)
}, 60 * 1000) // 1 minute
}
/**
* Anything calling retrigger during the callback will be paused and retriggered after the callback resolves.
* @template T
* @param {string} messageID
* @param {Promise<T>} promise
* @returns {Promise<T>}
*/
async function pauseChanges(messageID, promise) {
try {
debugRetrigger(`[retrigger] PAUSE id = ${messageID}`)
paused.add(messageID)
return await promise
} finally {
debugRetrigger(`[retrigger] RESUME id = ${messageID}`)
paused.delete(messageID)
finishedBridging(messageID)
}
}
/**
* Triggers any pending operations that were waiting on the corresponding event ID.
* @param {string} id
*/
function finishedBridging(id) {
if (emitter.listeners(id).length) {
debugRetrigger(`[retrigger] EMIT id = ${id}`)
}
emitter.emit(id)
}
module.exports.eventNotFoundThenRetrigger = eventNotFoundThenRetrigger
module.exports.messageNotFoundThenRetrigger = messageNotFoundThenRetrigger
module.exports.reactionNotFoundThenRetrigger = reactionNotFoundThenRetrigger
module.exports.finishedBridging = finishedBridging
module.exports.pauseChanges = pauseChanges

View file

@ -2,11 +2,17 @@
const {EventEmitter} = require("events")
const passthrough = require("../../passthrough")
const {select, sync} = passthrough
const {select, sync, from} = passthrough
/** @type {import("../../matrix/utils")} */
const utils = sync.require("../../matrix/utils")
const DEBUG_RETRIGGER = false
/*
Due to Eventual Consistency(TM) an update/delete may arrive before the original message arrives
(or before the it has finished being bridged to an event).
In this case, wait until the original message has finished bridging, then retrigger the passed function.
*/
const DEBUG_RETRIGGER = true
function debugRetrigger(message) {
if (DEBUG_RETRIGGER) {
@ -14,106 +20,140 @@ function debugRetrigger(message) {
}
}
const paused = new Set()
const emitter = new EventEmitter()
const storage = new class {
/** @private @type {Set<string>} */
paused = new Set()
/** @private @type {Map<string, ((found: Boolean) => any)[]>} id -> list of resolvers */
resolves = new Map()
/** @private @type {Map<string, ReturnType<setTimeout>>} id -> timer */
timers = new Map()
/**
* Due to Eventual Consistency(TM) an update/delete may arrive before the original message arrives
* (or before the it has finished being bridged to an event).
* In this case, wait until the original message has finished bridging, then retrigger the passed function.
* @template {(...args: any[]) => any} T
* @param {string} inputID
* @param {T} fn
* @param {Parameters<T>} rest
* @returns {boolean} false if the event was found and the function will be ignored, true if the event was not found and the function will be retriggered
* The purpose of storage is to store `resolve` and call it at a later time.
* @param {string} id
* @param {(found: Boolean) => any} resolve
*/
function eventNotFoundThenRetrigger(inputID, fn, ...rest) {
if (!paused.has(inputID)) {
if (inputID.match(/^[0-9]+$/)) {
const eventID = select("event_message", "event_id", {message_id: inputID}).pluck().get()
if (eventID) {
debugRetrigger(`[retrigger] OK mid <-> eid = ${inputID} <-> ${eventID}`)
return false // event was found so don't retrigger
store(id, resolve) {
debugRetrigger(`[retrigger] STORE id = ${id}`)
this.resolves.set(id, (this.resolves.get(id) || []).concat(resolve)) // add to list in map value
if (!this.timers.has(id)) {
debugRetrigger(`[retrigger] SET TIMER id = ${id}`)
this.timers.set(id, setTimeout(() => this.resolve(id, false), 60 * 1000).unref()) // 1 minute
}
} else if (inputID.match(/^\$/)) {
const messageID = select("event_message", "message_id", {event_id: inputID}).pluck().get()
if (messageID) {
debugRetrigger(`[retrigger] OK eid <-> mid = ${inputID} <-> ${messageID}`)
return false // message was found so don't retrigger
}
/** @param {string} id */
isNotPaused(id) {
return !storage.paused.has(id)
}
/** @param {string} id */
pause(id) {
debugRetrigger(`[retrigger] PAUSE id = ${id}`)
this.paused.add(id)
}
/**
* Go through `resolves` storage and resolve them all. (Also resets timer/paused.)
* @param {string} id
* @param {boolean} value
*/
resolve(id, value) {
if (this.paused.has(id)) {
debugRetrigger(`[retrigger] RESUME id = ${id}`)
this.paused.delete(id)
}
if (this.resolves.has(id)) {
debugRetrigger(`[retrigger] RESOLVE ${value} id = ${id}`)
const fns = this.resolves.get(id) || []
this.resolves.delete(id)
for (const fn of fns) {
fn(value)
}
}
if (this.timers.has(id)) {
clearTimeout(this.timers.get(id))
this.timers.delete(id)
}
}
}
debugRetrigger(`[retrigger] WAIT id = ${inputID}`)
emitter.once(inputID, () => {
debugRetrigger(`[retrigger] TRIGGER id = ${inputID}`)
fn(...rest)
})
// if the event never arrives, don't trigger the callback, just clean up
setTimeout(() => {
if (emitter.listeners(inputID).length) {
debugRetrigger(`[retrigger] EXPIRE id = ${inputID}`)
}
emitter.removeAllListeners(inputID)
}, 60 * 1000) // 1 minute
return true // event was not found, then retrigger
/**
* @param {string} id
* @param {(found: Boolean) => any} resolve
* @param {boolean} existsInDatabase
*/
function waitFor(id, resolve, existsInDatabase) {
if (existsInDatabase && storage.isNotPaused(id)) { // if event already exists and isn't paused then resolve immediately
debugRetrigger(`[retrigger] EXISTS id = ${id}`)
return resolve(true)
}
function reactionNotFoundThenRetrigger(reactionEventID, fn, ...rest){
const reactionEventHash = utils.getEventIDHash(reactionEventID)
const reaction = select("reaction", "encoded_emoji", {hashed_event_id: reactionEventHash})
if (reaction) {
debugRetrigger(`[retrigger] OK eid <-> reaction = ${reactionEventID} <-> ${reactionEventHash}`)
return false
// doesn't exist. wait for it to exist. storage will resolve true if it exists or false if it timed out
return storage.store(id, resolve)
}
debugRetrigger(`[retrigger] WAIT id = ${reactionEventID}`)
emitter.once(reactionEventID, () => {
debugRetrigger(`[retrigger] TRIGGER id = ${reactionEventID}`)
fn(...rest)
})
// if the event never arrives, don't trigger the callback, just clean up
setTimeout(() => {
if (emitter.listeners(reactionEventID).length) {
debugRetrigger(`[retrigger] EXPIRE id = ${reactionEventID}`)
const GET_EVENT_PREPARED = from("event_message").select("event_id").and("WHERE event_id = ?").prepare().raw()
/**
* @param {string} eventID
* @returns {Promise<boolean>} if true then the message did not arrive
*/
function waitForEvent(eventID) {
const {promise, resolve} = Promise.withResolvers()
waitFor(eventID, resolve, !!GET_EVENT_PREPARED.get(eventID))
return promise
}
emitter.removeAllListeners(reactionEventID)
}, 60 * 1000) // 1 minute
return true // event was not found, then retrigger
const GET_MESSAGE_PREPARED = from("event_message").select("message_id").and("WHERE message_id = ?").prepare().raw()
/**
* @param {string} messageID
* @returns {Promise<boolean>} if true then the message did not arrive
*/
function waitForMessage(messageID) {
const {promise, resolve} = Promise.withResolvers()
waitFor(messageID, resolve, !!GET_MESSAGE_PREPARED.get(messageID))
return promise
}
const GET_REACTION_EVENT_PREPARED = from("reaction").select("hashed_event_id").and("WHERE hashed_event_id = ?").prepare().raw()
/**
* @param {string} eventID
* @returns {Promise<boolean>} if true then the message did not arrive
*/
function waitForReactionEvent(eventID) {
const {promise, resolve} = Promise.withResolvers()
waitFor(eventID, resolve, !!GET_REACTION_EVENT_PREPARED.get(utils.getEventIDHash(eventID)))
return promise
}
/**
* Anything calling retrigger during the callback will be paused and retriggered after the callback resolves.
* @template T
* @param {string} messageID
* @param {string} id
* @param {Promise<T>} promise
* @returns {Promise<T>}
*/
async function pauseChanges(messageID, promise) {
async function pauseChanges(id, promise) {
try {
debugRetrigger(`[retrigger] PAUSE id = ${messageID}`)
paused.add(messageID)
storage.pause(id)
return await promise
} finally {
debugRetrigger(`[retrigger] RESUME id = ${messageID}`)
paused.delete(messageID)
messageFinishedBridging(messageID)
finishedBridging(id)
}
}
/**
* Triggers any pending operations that were waiting on the corresponding event ID.
* @param {string} messageID
* @param {string} id
*/
function messageFinishedBridging(messageID) {
if (emitter.listeners(messageID).length) {
debugRetrigger(`[retrigger] EMIT id = ${messageID}`)
}
emitter.emit(messageID)
function finishedBridging(id) {
storage.resolve(id, true)
}
module.exports.eventNotFoundThenRetrigger = eventNotFoundThenRetrigger
module.exports.reactionNotFoundThenRetrigger = reactionNotFoundThenRetrigger
module.exports.messageFinishedBridging = messageFinishedBridging
module.exports.waitForMessage = waitForMessage
module.exports.waitForEvent = waitForEvent
module.exports.waitForReactionEvent = waitForReactionEvent
module.exports.pauseChanges = pauseChanges
module.exports.finishedBridging = finishedBridging

View file

@ -34,7 +34,8 @@ function removeReaction(data, reactions, key) {
// Even though the bridge bot only reacted once on Discord-side, multiple Matrix users may have
// reacted on Matrix-side. Semantically, we want to remove the reaction from EVERY Matrix user.
// Also need to clean up the database.
const hash = utils.getEventIDHash(event.event_id)
console.log("ELLIE TEST: we are removing all matrix reactions, because we got a removal from Discord")
const hash = utils.getEventIDHash(eventID)
removals.push({eventID, mxid: null, hash})
}
if (!lookingAtMatrixReaction && !wantToRemoveMatrixReaction) {

View file

@ -2,6 +2,7 @@
const assert = require("assert").strict
const DiscordTypes = require("discord-api-types/v10")
const {id: botID} = require("../../addbot")
const {sync, db, select, from} = require("../passthrough")
/** @type {import("./actions/send-message")}) */
@ -38,6 +39,8 @@ const removeMember = sync.require("./actions/remove-member")
const vote = sync.require("./actions/poll-vote")
/** @type {import("../m2d/event-dispatcher")} */
const matrixEventDispatcher = sync.require("../m2d/event-dispatcher")
/** @type {import("../m2d/actions/redact.js")} */
const redact = sync.require("../m2d/actions/redact.js")
/** @type {import("../discord/interactions/matrix-info")} */
const matrixInfoInteraction = sync.require("../discord/interactions/matrix-info")
/** @type {import("../agi/listener")} */
@ -321,7 +324,7 @@ module.exports = {
// @ts-ignore
await sendMessage.sendMessage(message, channel, guild, row)
retrigger.messageFinishedBridging(message.id)
retrigger.finishedBridging(message.id)
},
/**
@ -342,7 +345,7 @@ module.exports = {
if (!row) {
// Check that the sending-to room exists, and deal with Eventual Consistency(TM)
if (retrigger.eventNotFoundThenRetrigger(data.id, module.exports.MESSAGE_UPDATE, client, data)) return
if (!await retrigger.waitForMessage(data.id)) return
}
/** @type {DiscordTypes.GatewayMessageCreateDispatchData} */
@ -380,6 +383,17 @@ module.exports = {
* @param {DiscordTypes.GatewayMessageReactionRemoveDispatchData | DiscordTypes.GatewayMessageReactionRemoveEmojiDispatchData | DiscordTypes.GatewayMessageReactionRemoveAllDispatchData} data
*/
async onSomeReactionsRemoved(client, data) {
// Don't attempt to double-bridge our own deleted reactions, this would go badly if there are race conditions
if ("emoji" in data && !data.emoji.name) console.error(":(", data)
if ("user_id" in data && data.user_id === botID && data.emoji.name) { // sure hope data.emoji.name exists here
const encodedEmoji = encodeURIComponent(data.emoji.id ? `${data.emoji.name}:${data.emoji.id}` : data.emoji.name)
const i = redact.ourDeletedReactions.findIndex(x => data.message_id === x.message_id && encodedEmoji === x.encoded_emoji)
if (i !== -1) {
redact.ourDeletedReactions.splice(i, 1)
return
}
}
await removeReaction.removeSomeReactions(data)
},
@ -389,7 +403,7 @@ module.exports = {
*/
async MESSAGE_DELETE(client, data) {
speedbump.onMessageDelete(data.id)
if (retrigger.eventNotFoundThenRetrigger(data.id, module.exports.MESSAGE_DELETE, client, data)) return
if (!await retrigger.waitForMessage(data.id)) return
await deleteMessage.deleteMessage(data)
},
@ -437,12 +451,12 @@ module.exports = {
* @param {DiscordTypes.GatewayMessagePollVoteDispatchData} data
*/
async MESSAGE_POLL_VOTE_ADD(client, data) {
if (retrigger.eventNotFoundThenRetrigger(data.message_id, module.exports.MESSAGE_POLL_VOTE_ADD, client, data)) return
if (!await retrigger.waitForMessage(data.message_id)) return
await vote.addVote(data)
},
async MESSAGE_POLL_VOTE_REMOVE(client, data) {
if (retrigger.eventNotFoundThenRetrigger(data.message_id, module.exports.MESSAGE_POLL_VOTE_REMOVE, client, data)) return
if (!await retrigger.waitForMessage(data.message_id)) return
await vote.removeVote(data)
},

View file

@ -17,7 +17,7 @@ const retrigger = sync.require("../../d2m/actions/retrigger")
*/
async function addReaction(event) {
// Wait until the corresponding channel and message have already been bridged
if (retrigger.eventNotFoundThenRetrigger(event.content["m.relates_to"].event_id, () => as.emit("type:m.reaction", event))) return
if (!await retrigger.waitForEvent(event.content["m.relates_to"].event_id)) return
// These will exist because it passed retrigger
const row = from("event_message").join("message_room", "message_id").join("historical_channel_room", "historical_room_index")
@ -51,7 +51,7 @@ async function addReaction(event) {
db.prepare("REPLACE INTO reaction (hashed_event_id, message_id, encoded_emoji, original_encoding) VALUES (?, ?, ?, ?)").run(utils.getEventIDHash(event.event_id), messageID, discordPreferredEncoding, key)
retrigger.messageFinishedBridging(event.event_id)
retrigger.finishedBridging(event.event_id)
}
module.exports.addReaction = addReaction

View file

@ -10,6 +10,9 @@ const utils = sync.require("../../matrix/utils")
/** @type {import("../../d2m/actions/retrigger")} */
const retrigger = sync.require("../../d2m/actions/retrigger")
/** @type {{message_id: string, encoded_emoji: string}[]} */
const ourDeletedReactions = []
/**
* @param {Ty.Event.Outer_M_Room_Redaction} event
*/
@ -24,6 +27,21 @@ async function deleteMessage(event) {
db.prepare("DELETE FROM message_room WHERE message_id = ?").run(rows[0].message_id)
}
/**
* @param {Ty.Event.Outer_M_Room_Redaction} event
*/
async function removeMessageEvent(event) {
// Could be for removing a message or suppressing embeds. For more information, the message needs to be bridged first.
if (!await retrigger.waitForEvent(event.redacts)) return
const row = select("event_message", ["event_type", "event_subtype", "part"], {event_id: event.redacts}).get()
if (row && row.event_type === "m.room.message" && row.event_subtype === "m.notice" && row.part === 1) {
await suppressEmbeds(event)
} else {
await deleteMessage(event)
}
}
/**
* @param {Ty.Event.Outer_M_Room_Redaction} event
*/
@ -41,7 +59,7 @@ async function suppressEmbeds(event) {
* @param {Ty.Event.Outer_M_Room_Redaction} event
*/
async function removeReaction(event) {
if (retrigger.reactionNotFoundThenRetrigger(event.redacts, () => as.emit("type:m.room.redaction", event))) return
if (!await retrigger.waitForReactionEvent(event.redacts)) return
const hash = utils.getEventIDHash(event.redacts)
const row = from("reaction").join("message_room", "message_id").join("historical_channel_room", "historical_room_index")
@ -50,6 +68,7 @@ async function removeReaction(event) {
// See how many Matrix-side reactions there are, and delete if it's the last one
const numberOfReactions = from("reaction").where({message_id: row.message_id, encoded_emoji: row.encoded_emoji}).pluckUnsafe("count(*)").get()
if (numberOfReactions === 1) {
ourDeletedReactions.push(row)
await discord.snow.channel.deleteReactionSelf(row.reference_channel_id, row.message_id, row.encoded_emoji)
}
db.prepare("DELETE FROM reaction WHERE hashed_event_id = ?").run(hash)
@ -60,18 +79,12 @@ async function removeReaction(event) {
* @param {Ty.Event.Outer_M_Room_Redaction} event
*/
async function handle(event) {
// If this is for removing a reaction, try it
await removeReaction(event)
// Or, it might be for removing a message or suppressing embeds. But to do that, the message needs to be bridged first.
if (retrigger.eventNotFoundThenRetrigger(event.redacts, () => as.emit("type:m.room.redaction", event))) return
const row = select("event_message", ["event_type", "event_subtype", "part"], {event_id: event.redacts}).get()
if (row && row.event_type === "m.room.message" && row.event_subtype === "m.notice" && row.part === 1) {
await suppressEmbeds(event)
} else {
await deleteMessage(event)
}
// Don't know if it's a redaction for a reaction or an event, try both at the same time (otherwise waitFor will block)
await Promise.all([
removeMessageEvent(event),
removeReaction(event)
])
}
module.exports.handle = handle
module.exports.ourDeletedReactions = ourDeletedReactions

View file

@ -211,7 +211,7 @@ async event => {
// @ts-ignore
await matrixCommandHandler.execute(event)
}
retrigger.messageFinishedBridging(event.event_id)
retrigger.finishedBridging(event.event_id)
await api.ackEvent(event)
}))
@ -222,7 +222,7 @@ sync.addTemporaryListener(as, "type:m.sticker", guard("m.sticker",
async event => {
if (utils.eventSenderIsFromDiscord(event.sender)) return
const messageResponses = await sendEvent.sendEvent(event)
retrigger.messageFinishedBridging(event.event_id)
retrigger.finishedBridging(event.event_id)
await api.ackEvent(event)
}))

View file

@ -21,6 +21,8 @@ const speedbump = sync.require("./d2m/actions/speedbump")
const ks = sync.require("./matrix/kstate")
const setPresence = sync.require("./d2m/actions/set-presence")
const channelWebhook = sync.require("./m2d/actions/channel-webhook")
const dUtils = sync.require("./discord/utils")
const mxUtils = sync.require("./matrix/utils")
const guildID = "112760669178241024"
async function ping() {