Compare commits

...

2 commits

5 changed files with 103 additions and 30 deletions

View file

@ -142,8 +142,19 @@ async function syncUser(author, pkMessage, roomID) {
} }
/** @returns {Promise<Ty.PkMessage>} */ /** @returns {Promise<Ty.PkMessage>} */
function fetchMessage(messageID) { async function fetchMessage(messageID) {
return fetch(`https://api.pluralkit.me/v2/messages/${messageID}`).then(res => res.json()) // Their backend is weird. Sometimes it says "message not found" (code 20006) on the first try, so we make multiple attempts.
let attempts = 0
do {
var res = await fetch(`https://api.pluralkit.me/v2/messages/${messageID}`)
if (res.ok) return res.json()
// I think the backend needs some time to update.
await new Promise(resolve => setTimeout(resolve, 2000))
} while (++attempts < 3)
const errorMessage = await res.json()
throw new Error(`PK API returned an error after ${attempts} tries: ${JSON.stringify(errorMessage)}`)
} }
module.exports._memberToStateContent = memberToStateContent module.exports._memberToStateContent = memberToStateContent

61
d2m/actions/retrigger.js Normal file
View file

@ -0,0 +1,61 @@
// @ts-check
const {EventEmitter} = require("events")
const passthrough = require("../../passthrough")
const {select} = passthrough
const DEBUG_RETRIGGER = false
function debugRetrigger(message) {
if (DEBUG_RETRIGGER) {
console.log(message)
}
}
const emitter = new EventEmitter()
/**
* Due to Eventual Consistency(TM) an update/delete may arrive before the original message arrives
* (or before the it has finished being bridged to an event).
* In this case, wait until the original message has finished bridging, then retrigger the passed function.
* @template {(...args: any) => Promise<any>} T
* @param {string} messageID
* @param {T} fn
* @param {Parameters<T>} rest
* @returns {boolean} false if the event was found and the function will be ignored, true if the event was not found and the function will be retriggered
*/
function eventNotFoundThenRetrigger(messageID, fn, ...rest) {
const eventID = select("event_message", "event_id", {message_id: messageID}).pluck().get()
if (eventID) {
debugRetrigger(`[retrigger] OK mid <-> eid = ${messageID} <-> ${eventID}`)
return false // event was found so don't retrigger
}
debugRetrigger(`[retrigger] WAIT mid <-> eid = ${messageID} <-> ${eventID}`)
emitter.addListener(messageID, () => {
debugRetrigger(`[retrigger] TRIGGER mid = ${messageID}`)
fn(...rest)
})
// if the event never arrives, don't trigger the callback, just clean up
setTimeout(() => {
if (emitter.listeners(messageID).length) {
debugRetrigger(`[retrigger] EXPIRE mid = ${messageID}`)
}
emitter.removeAllListeners(messageID)
}, 60 * 1000) // 1 minute
return true // event was not found, then retrigger
}
/**
* Triggers any pending operations that were waiting on the corresponding event ID.
* @param {string} messageID
*/
function messageFinishedBridging(messageID) {
if (emitter.listeners(messageID).length) {
debugRetrigger(`[retrigger] EMIT mid = ${messageID}`)
}
emitter.emit(messageID)
}
module.exports.eventNotFoundThenRetrigger = eventNotFoundThenRetrigger
module.exports.messageFinishedBridging = messageFinishedBridging

View file

@ -39,15 +39,8 @@ async function sendMessage(message, channel, guild, row) {
} else if (row && row.speedbump_webhook_id === message.webhook_id) { } else if (row && row.speedbump_webhook_id === message.webhook_id) {
// Handle the PluralKit public instance // Handle the PluralKit public instance
if (row.speedbump_id === "466378653216014359") { if (row.speedbump_id === "466378653216014359") {
const root = await registerPkUser.fetchMessage(message.id) const pkMessage = await registerPkUser.fetchMessage(message.id)
// Member is null if member was deleted. We just got this message, so member surely exists. senderMxid = await registerPkUser.syncUser(message.author, pkMessage, roomID)
if (!root.member) {
const e = new Error("PK API did not return a member")
message["__pk_response__"] = root
console.error(root)
throw e
}
senderMxid = await registerPkUser.syncUser(message.author, root, roomID)
} }
} }

View file

@ -30,7 +30,8 @@ function eventCanBeEdited(ev) {
* @param {import("../../matrix/api")} api simple-as-nails dependency injection for the matrix API * @param {import("../../matrix/api")} api simple-as-nails dependency injection for the matrix API
*/ */
async function editToChanges(message, guild, api) { async function editToChanges(message, guild, api) {
// If it is a user edit, allow deleting old messages (e.g. they might have removed text from an image). If it is the system adding a generated embed to a message, don't delete old messages since the system only sends partial data. // If it is a user edit, allow deleting old messages (e.g. they might have removed text from an image).
// If it is the system adding a generated embed to a message, don't delete old messages since the system only sends partial data.
const isGeneratedEmbed = !("content" in message) const isGeneratedEmbed = !("content" in message)
@ -38,22 +39,21 @@ async function editToChanges(message, guild, api) {
const roomID = select("channel_room", "room_id", {channel_id: message.channel_id}).pluck().get() const roomID = select("channel_room", "room_id", {channel_id: message.channel_id}).pluck().get()
assert(roomID) assert(roomID)
const oldEventRows = select("event_message", ["event_id", "event_type", "event_subtype", "part", "reaction_part"], {message_id: message.id}).all()
/** @type {string?} Null if we don't have a sender in the room, which will happen if it's a webhook's message. The bridge bot will do the edit instead. */ /** @type {string?} Null if we don't have a sender in the room, which will happen if it's a webhook's message. The bridge bot will do the edit instead. */
let senderMxid = null let senderMxid = null
if (message.author) { if (message.author) {
senderMxid = from("sim").join("sim_member", "mxid").where({user_id: message.author.id, room_id: roomID}).pluck("mxid").get() || null senderMxid = from("sim").join("sim_member", "mxid").where({user_id: message.author.id, room_id: roomID}).pluck("mxid").get() || null
} else { } else {
// Should be a system generated embed. We want the embed to be sent by the same user who sent the message, so that the messages get grouped in most clients. // Should be a system generated embed. We want the embed to be sent by the same user who sent the message, so that the messages get grouped in most clients.
const eventID = select("event_message", "event_id", {message_id: message.id}).pluck().get() const eventID = oldEventRows[0].event_id // a calling function should have already checked that there is at least one message to edit
assert(eventID) // this should have been checked earlier in a calling function
const event = await api.getEvent(roomID, eventID) const event = await api.getEvent(roomID, eventID)
if (utils.eventSenderIsFromDiscord(event.sender)) { if (utils.eventSenderIsFromDiscord(event.sender)) {
senderMxid = event.sender senderMxid = event.sender
} }
} }
const oldEventRows = select("event_message", ["event_id", "event_type", "event_subtype", "part", "reaction_part"], {message_id: message.id}).all()
// Figure out what we will be replacing them with // Figure out what we will be replacing them with
const newFallbackContent = await messageToEvent.messageToEvent(message, guild, {includeEditFallbackStar: true}, {api}) const newFallbackContent = await messageToEvent.messageToEvent(message, guild, {includeEditFallbackStar: true}, {api})

View file

@ -33,6 +33,8 @@ const discordCommandHandler = sync.require("../discord/discord-command-handler")
const mxUtils = require("../m2d/converters/utils") const mxUtils = require("../m2d/converters/utils")
/** @type {import("./actions/speedbump")} */ /** @type {import("./actions/speedbump")} */
const speedbump = sync.require("./actions/speedbump") const speedbump = sync.require("./actions/speedbump")
/** @type {import("./actions/retrigger")} */
const retrigger = sync.require("./actions/retrigger")
/** @type {any} */ // @ts-ignore bad types from semaphore /** @type {any} */ // @ts-ignore bad types from semaphore
const Semaphore = require("@chriscdn/promise-semaphore") const Semaphore = require("@chriscdn/promise-semaphore")
@ -252,6 +254,8 @@ module.exports = {
// @ts-ignore // @ts-ignore
await sendMessage.sendMessage(message, channel, guild, row), await sendMessage.sendMessage(message, channel, guild, row),
await discordCommandHandler.execute(message, channel, guild) await discordCommandHandler.execute(message, channel, guild)
retrigger.messageFinishedBridging(message.id)
}, },
/** /**
@ -259,6 +263,14 @@ module.exports = {
* @param {DiscordTypes.GatewayMessageUpdateDispatchData} data * @param {DiscordTypes.GatewayMessageUpdateDispatchData} data
*/ */
async onMessageUpdate(client, data) { async onMessageUpdate(client, data) {
// Based on looking at data they've sent me over the gateway, this is the best way to check for meaningful changes.
// If the message content is a string then it includes all interesting fields and is meaningful.
// Otherwise, if there are embeds, then the system generated URL preview embeds.
if (!(typeof data.content === "string" || "embeds" in data)) return
// Deal with Eventual Consistency(TM)
if (retrigger.eventNotFoundThenRetrigger(data.id, module.exports.onMessageUpdate, client, data)) return
if (data.webhook_id) { if (data.webhook_id) {
const row = select("webhook", "webhook_id", {webhook_id: data.webhook_id}).pluck().get() const row = select("webhook", "webhook_id", {webhook_id: data.webhook_id}).pluck().get()
if (row) return // The message was sent by the bridge's own webhook on discord. We don't want to reflect this back, so just drop it. if (row) return // The message was sent by the bridge's own webhook on discord. We don't want to reflect this back, so just drop it.
@ -270,10 +282,6 @@ module.exports = {
const {affected, row} = await speedbump.maybeDoSpeedbump(data.channel_id, data.id) const {affected, row} = await speedbump.maybeDoSpeedbump(data.channel_id, data.id)
if (affected) return if (affected) return
// Based on looking at data they've sent me over the gateway, this is the best way to check for meaningful changes.
// If the message content is a string then it includes all interesting fields and is meaningful.
// Otherwise, if there are embeds, then the system generated URL preview embeds.
if (typeof data.content === "string" || "embeds" in data) {
/** @type {DiscordTypes.GatewayMessageCreateDispatchData} */ /** @type {DiscordTypes.GatewayMessageCreateDispatchData} */
// @ts-ignore // @ts-ignore
const message = data const message = data
@ -284,7 +292,6 @@ module.exports = {
assert(guild) assert(guild)
// @ts-ignore // @ts-ignore
await editMessage.editMessage(message, guild, row) await editMessage.editMessage(message, guild, row)
}
}, },
/** /**
@ -311,6 +318,7 @@ module.exports = {
*/ */
async onMessageDelete(client, data) { async onMessageDelete(client, data) {
speedbump.onMessageDelete(data.id) speedbump.onMessageDelete(data.id)
if (retrigger.eventNotFoundThenRetrigger(data.id, module.exports.onMessageDelete, client, data)) return
await deleteMessage.deleteMessage(data) await deleteMessage.deleteMessage(data)
}, },