Compare commits
2 commits
9e51482b60
...
a2f0bb79db
Author | SHA1 | Date | |
---|---|---|---|
a2f0bb79db | |||
149cee845a |
5 changed files with 103 additions and 30 deletions
|
@ -142,8 +142,19 @@ async function syncUser(author, pkMessage, roomID) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @returns {Promise<Ty.PkMessage>} */
|
/** @returns {Promise<Ty.PkMessage>} */
|
||||||
function fetchMessage(messageID) {
|
async function fetchMessage(messageID) {
|
||||||
return fetch(`https://api.pluralkit.me/v2/messages/${messageID}`).then(res => res.json())
|
// Their backend is weird. Sometimes it says "message not found" (code 20006) on the first try, so we make multiple attempts.
|
||||||
|
let attempts = 0
|
||||||
|
do {
|
||||||
|
var res = await fetch(`https://api.pluralkit.me/v2/messages/${messageID}`)
|
||||||
|
if (res.ok) return res.json()
|
||||||
|
|
||||||
|
// I think the backend needs some time to update.
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 2000))
|
||||||
|
} while (++attempts < 3)
|
||||||
|
|
||||||
|
const errorMessage = await res.json()
|
||||||
|
throw new Error(`PK API returned an error after ${attempts} tries: ${JSON.stringify(errorMessage)}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports._memberToStateContent = memberToStateContent
|
module.exports._memberToStateContent = memberToStateContent
|
||||||
|
|
61
d2m/actions/retrigger.js
Normal file
61
d2m/actions/retrigger.js
Normal file
|
@ -0,0 +1,61 @@
|
||||||
|
// @ts-check
|
||||||
|
|
||||||
|
const {EventEmitter} = require("events")
|
||||||
|
const passthrough = require("../../passthrough")
|
||||||
|
const {select} = passthrough
|
||||||
|
|
||||||
|
const DEBUG_RETRIGGER = false
|
||||||
|
|
||||||
|
function debugRetrigger(message) {
|
||||||
|
if (DEBUG_RETRIGGER) {
|
||||||
|
console.log(message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const emitter = new EventEmitter()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Due to Eventual Consistency(TM) an update/delete may arrive before the original message arrives
|
||||||
|
* (or before the it has finished being bridged to an event).
|
||||||
|
* In this case, wait until the original message has finished bridging, then retrigger the passed function.
|
||||||
|
* @template {(...args: any) => Promise<any>} T
|
||||||
|
* @param {string} messageID
|
||||||
|
* @param {T} fn
|
||||||
|
* @param {Parameters<T>} rest
|
||||||
|
* @returns {boolean} false if the event was found and the function will be ignored, true if the event was not found and the function will be retriggered
|
||||||
|
*/
|
||||||
|
function eventNotFoundThenRetrigger(messageID, fn, ...rest) {
|
||||||
|
const eventID = select("event_message", "event_id", {message_id: messageID}).pluck().get()
|
||||||
|
if (eventID) {
|
||||||
|
debugRetrigger(`[retrigger] OK mid <-> eid = ${messageID} <-> ${eventID}`)
|
||||||
|
return false // event was found so don't retrigger
|
||||||
|
}
|
||||||
|
|
||||||
|
debugRetrigger(`[retrigger] WAIT mid <-> eid = ${messageID} <-> ${eventID}`)
|
||||||
|
emitter.addListener(messageID, () => {
|
||||||
|
debugRetrigger(`[retrigger] TRIGGER mid = ${messageID}`)
|
||||||
|
fn(...rest)
|
||||||
|
})
|
||||||
|
// if the event never arrives, don't trigger the callback, just clean up
|
||||||
|
setTimeout(() => {
|
||||||
|
if (emitter.listeners(messageID).length) {
|
||||||
|
debugRetrigger(`[retrigger] EXPIRE mid = ${messageID}`)
|
||||||
|
}
|
||||||
|
emitter.removeAllListeners(messageID)
|
||||||
|
}, 60 * 1000) // 1 minute
|
||||||
|
return true // event was not found, then retrigger
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Triggers any pending operations that were waiting on the corresponding event ID.
|
||||||
|
* @param {string} messageID
|
||||||
|
*/
|
||||||
|
function messageFinishedBridging(messageID) {
|
||||||
|
if (emitter.listeners(messageID).length) {
|
||||||
|
debugRetrigger(`[retrigger] EMIT mid = ${messageID}`)
|
||||||
|
}
|
||||||
|
emitter.emit(messageID)
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports.eventNotFoundThenRetrigger = eventNotFoundThenRetrigger
|
||||||
|
module.exports.messageFinishedBridging = messageFinishedBridging
|
|
@ -39,15 +39,8 @@ async function sendMessage(message, channel, guild, row) {
|
||||||
} else if (row && row.speedbump_webhook_id === message.webhook_id) {
|
} else if (row && row.speedbump_webhook_id === message.webhook_id) {
|
||||||
// Handle the PluralKit public instance
|
// Handle the PluralKit public instance
|
||||||
if (row.speedbump_id === "466378653216014359") {
|
if (row.speedbump_id === "466378653216014359") {
|
||||||
const root = await registerPkUser.fetchMessage(message.id)
|
const pkMessage = await registerPkUser.fetchMessage(message.id)
|
||||||
// Member is null if member was deleted. We just got this message, so member surely exists.
|
senderMxid = await registerPkUser.syncUser(message.author, pkMessage, roomID)
|
||||||
if (!root.member) {
|
|
||||||
const e = new Error("PK API did not return a member")
|
|
||||||
message["__pk_response__"] = root
|
|
||||||
console.error(root)
|
|
||||||
throw e
|
|
||||||
}
|
|
||||||
senderMxid = await registerPkUser.syncUser(message.author, root, roomID)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,8 @@ function eventCanBeEdited(ev) {
|
||||||
* @param {import("../../matrix/api")} api simple-as-nails dependency injection for the matrix API
|
* @param {import("../../matrix/api")} api simple-as-nails dependency injection for the matrix API
|
||||||
*/
|
*/
|
||||||
async function editToChanges(message, guild, api) {
|
async function editToChanges(message, guild, api) {
|
||||||
// If it is a user edit, allow deleting old messages (e.g. they might have removed text from an image). If it is the system adding a generated embed to a message, don't delete old messages since the system only sends partial data.
|
// If it is a user edit, allow deleting old messages (e.g. they might have removed text from an image).
|
||||||
|
// If it is the system adding a generated embed to a message, don't delete old messages since the system only sends partial data.
|
||||||
|
|
||||||
const isGeneratedEmbed = !("content" in message)
|
const isGeneratedEmbed = !("content" in message)
|
||||||
|
|
||||||
|
@ -38,22 +39,21 @@ async function editToChanges(message, guild, api) {
|
||||||
|
|
||||||
const roomID = select("channel_room", "room_id", {channel_id: message.channel_id}).pluck().get()
|
const roomID = select("channel_room", "room_id", {channel_id: message.channel_id}).pluck().get()
|
||||||
assert(roomID)
|
assert(roomID)
|
||||||
|
const oldEventRows = select("event_message", ["event_id", "event_type", "event_subtype", "part", "reaction_part"], {message_id: message.id}).all()
|
||||||
|
|
||||||
/** @type {string?} Null if we don't have a sender in the room, which will happen if it's a webhook's message. The bridge bot will do the edit instead. */
|
/** @type {string?} Null if we don't have a sender in the room, which will happen if it's a webhook's message. The bridge bot will do the edit instead. */
|
||||||
let senderMxid = null
|
let senderMxid = null
|
||||||
if (message.author) {
|
if (message.author) {
|
||||||
senderMxid = from("sim").join("sim_member", "mxid").where({user_id: message.author.id, room_id: roomID}).pluck("mxid").get() || null
|
senderMxid = from("sim").join("sim_member", "mxid").where({user_id: message.author.id, room_id: roomID}).pluck("mxid").get() || null
|
||||||
} else {
|
} else {
|
||||||
// Should be a system generated embed. We want the embed to be sent by the same user who sent the message, so that the messages get grouped in most clients.
|
// Should be a system generated embed. We want the embed to be sent by the same user who sent the message, so that the messages get grouped in most clients.
|
||||||
const eventID = select("event_message", "event_id", {message_id: message.id}).pluck().get()
|
const eventID = oldEventRows[0].event_id // a calling function should have already checked that there is at least one message to edit
|
||||||
assert(eventID) // this should have been checked earlier in a calling function
|
|
||||||
const event = await api.getEvent(roomID, eventID)
|
const event = await api.getEvent(roomID, eventID)
|
||||||
if (utils.eventSenderIsFromDiscord(event.sender)) {
|
if (utils.eventSenderIsFromDiscord(event.sender)) {
|
||||||
senderMxid = event.sender
|
senderMxid = event.sender
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const oldEventRows = select("event_message", ["event_id", "event_type", "event_subtype", "part", "reaction_part"], {message_id: message.id}).all()
|
|
||||||
|
|
||||||
// Figure out what we will be replacing them with
|
// Figure out what we will be replacing them with
|
||||||
|
|
||||||
const newFallbackContent = await messageToEvent.messageToEvent(message, guild, {includeEditFallbackStar: true}, {api})
|
const newFallbackContent = await messageToEvent.messageToEvent(message, guild, {includeEditFallbackStar: true}, {api})
|
||||||
|
|
|
@ -33,6 +33,8 @@ const discordCommandHandler = sync.require("../discord/discord-command-handler")
|
||||||
const mxUtils = require("../m2d/converters/utils")
|
const mxUtils = require("../m2d/converters/utils")
|
||||||
/** @type {import("./actions/speedbump")} */
|
/** @type {import("./actions/speedbump")} */
|
||||||
const speedbump = sync.require("./actions/speedbump")
|
const speedbump = sync.require("./actions/speedbump")
|
||||||
|
/** @type {import("./actions/retrigger")} */
|
||||||
|
const retrigger = sync.require("./actions/retrigger")
|
||||||
|
|
||||||
/** @type {any} */ // @ts-ignore bad types from semaphore
|
/** @type {any} */ // @ts-ignore bad types from semaphore
|
||||||
const Semaphore = require("@chriscdn/promise-semaphore")
|
const Semaphore = require("@chriscdn/promise-semaphore")
|
||||||
|
@ -252,6 +254,8 @@ module.exports = {
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
await sendMessage.sendMessage(message, channel, guild, row),
|
await sendMessage.sendMessage(message, channel, guild, row),
|
||||||
await discordCommandHandler.execute(message, channel, guild)
|
await discordCommandHandler.execute(message, channel, guild)
|
||||||
|
|
||||||
|
retrigger.messageFinishedBridging(message.id)
|
||||||
},
|
},
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -259,6 +263,14 @@ module.exports = {
|
||||||
* @param {DiscordTypes.GatewayMessageUpdateDispatchData} data
|
* @param {DiscordTypes.GatewayMessageUpdateDispatchData} data
|
||||||
*/
|
*/
|
||||||
async onMessageUpdate(client, data) {
|
async onMessageUpdate(client, data) {
|
||||||
|
// Based on looking at data they've sent me over the gateway, this is the best way to check for meaningful changes.
|
||||||
|
// If the message content is a string then it includes all interesting fields and is meaningful.
|
||||||
|
// Otherwise, if there are embeds, then the system generated URL preview embeds.
|
||||||
|
if (!(typeof data.content === "string" || "embeds" in data)) return
|
||||||
|
|
||||||
|
// Deal with Eventual Consistency(TM)
|
||||||
|
if (retrigger.eventNotFoundThenRetrigger(data.id, module.exports.onMessageUpdate, client, data)) return
|
||||||
|
|
||||||
if (data.webhook_id) {
|
if (data.webhook_id) {
|
||||||
const row = select("webhook", "webhook_id", {webhook_id: data.webhook_id}).pluck().get()
|
const row = select("webhook", "webhook_id", {webhook_id: data.webhook_id}).pluck().get()
|
||||||
if (row) return // The message was sent by the bridge's own webhook on discord. We don't want to reflect this back, so just drop it.
|
if (row) return // The message was sent by the bridge's own webhook on discord. We don't want to reflect this back, so just drop it.
|
||||||
|
@ -270,10 +282,6 @@ module.exports = {
|
||||||
const {affected, row} = await speedbump.maybeDoSpeedbump(data.channel_id, data.id)
|
const {affected, row} = await speedbump.maybeDoSpeedbump(data.channel_id, data.id)
|
||||||
if (affected) return
|
if (affected) return
|
||||||
|
|
||||||
// Based on looking at data they've sent me over the gateway, this is the best way to check for meaningful changes.
|
|
||||||
// If the message content is a string then it includes all interesting fields and is meaningful.
|
|
||||||
// Otherwise, if there are embeds, then the system generated URL preview embeds.
|
|
||||||
if (typeof data.content === "string" || "embeds" in data) {
|
|
||||||
/** @type {DiscordTypes.GatewayMessageCreateDispatchData} */
|
/** @type {DiscordTypes.GatewayMessageCreateDispatchData} */
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
const message = data
|
const message = data
|
||||||
|
@ -284,7 +292,6 @@ module.exports = {
|
||||||
assert(guild)
|
assert(guild)
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
await editMessage.editMessage(message, guild, row)
|
await editMessage.editMessage(message, guild, row)
|
||||||
}
|
|
||||||
},
|
},
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -311,6 +318,7 @@ module.exports = {
|
||||||
*/
|
*/
|
||||||
async onMessageDelete(client, data) {
|
async onMessageDelete(client, data) {
|
||||||
speedbump.onMessageDelete(data.id)
|
speedbump.onMessageDelete(data.id)
|
||||||
|
if (retrigger.eventNotFoundThenRetrigger(data.id, module.exports.onMessageDelete, client, data)) return
|
||||||
await deleteMessage.deleteMessage(data)
|
await deleteMessage.deleteMessage(data)
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue