diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 0230c9a7b8..2ee61eb549 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -278,14 +278,14 @@ export class QueueService { } @bindThis - public createImportMastoToDbJob(user: ThinUser, targets: string[]) { - const jobs = targets.map(rel => this.generateToDbJobData('importMastoToDb', { user, target: rel })); + public createImportMastoToDbJob(user: ThinUser, targets: string[], note: MiNote['id'] | null) { + const jobs = targets.map(rel => this.generateToDbJobData('importMastoToDb', { user, target: rel, note })); return this.dbQueue.addBulk(jobs); } @bindThis - public createImportPleroToDbJob(user: ThinUser, targets: string[]) { - const jobs = targets.map(rel => this.generateToDbJobData('importPleroToDb', { user, target: rel })); + public createImportPleroToDbJob(user: ThinUser, targets: string[], note: MiNote['id'] | null) { + const jobs = targets.map(rel => this.generateToDbJobData('importPleroToDb', { user, target: rel, note })); return this.dbQueue.addBulk(jobs); } diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts index cbe1d41e35..552b69d92d 100644 --- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts @@ -17,7 +17,7 @@ import { extractApHashtagObjects } from '@/core/activitypub/models/tag.js'; import { IdService } from '@/core/IdService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; -import type { DbNoteImportToDbJobData, DbNoteImportJobData, DbKeyNoteImportToDbJobData } from '../types.js'; +import type { DbNoteImportToDbJobData, DbNoteImportJobData, DbNoteWithParentImportToDbJobData } from '../types.js'; @Injectable() export class ImportNotesProcessorService { @@ -74,7 +74,7 @@ export class ImportNotesProcessorService { // Function was taken from Firefish and modified for our needs @bindThis - private async recreateChain(idField: string, replyField: string, arr: any[]): Promise { + private async recreateChain(idFieldPath: string[], replyFieldPath: string[], arr: any[], includeOrphans: boolean): Promise { type NotesMap = { [id: string]: any; }; @@ -83,28 +83,42 @@ export class ImportNotesProcessorService { const notesWaitingForParent: NotesMap = {}; for await (const note of arr) { - noteById[note[idField]] = note; + const noteId = idFieldPath.reduce( + (obj, step) => obj[step], + note, + ); + + noteById[noteId] = note; note.childNotes = []; - const children = notesWaitingForParent[note[idField]]; + const children = notesWaitingForParent[noteId]; if (children) { note.childNotes.push(...children); + delete notesWaitingForParent[noteId]; } - if (note[replyField] == null) { + const noteReplyId = replyFieldPath.reduce( + (obj, step) => obj[step], + note, + ); + if (noteReplyId == null) { notesTree.push(note); continue; } - const parent = noteById[note[replyField]]; + const parent = noteById[noteReplyId]; if (parent) { parent.childNotes.push(note); } else { - notesWaitingForParent[note[replyField]] ||= []; - notesWaitingForParent[note[replyField]].push(note); + notesWaitingForParent[noteReplyId] ||= []; + notesWaitingForParent[noteReplyId].push(note); } } + if (includeOrphans) { + notesTree.push(...Object.values(notesWaitingForParent).flat(1)); + } + return notesTree; } @@ -176,7 +190,7 @@ export class ImportNotesProcessorService { const tweets = Object.keys(fakeWindow.window.YTD.tweets.part0).reduce((m, key, i, obj) => { return m.concat(fakeWindow.window.YTD.tweets.part0[key].tweet); }, []); - const processedTweets = await this.recreateChain('id_str', 'in_reply_to_status_id_str', tweets); + const processedTweets = await this.recreateChain(['id_str'], ['in_reply_to_status_id_str'], tweets, false); this.queueService.createImportTweetsToDbJob(job.data.user, processedTweets, null); } finally { cleanup(); @@ -254,7 +268,8 @@ export class ImportNotesProcessorService { if (isPleroma) { const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8'); const outbox = JSON.parse(outboxJson); - this.queueService.createImportPleroToDbJob(job.data.user, outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note')); + const processedToots = await this.recreateChain(['object', 'id'], ['object', 'inReplyTo'], outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true); + this.queueService.createImportPleroToDbJob(job.data.user, processedToots, null); } else { const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8'); const outbox = JSON.parse(outboxJson); @@ -266,7 +281,8 @@ export class ImportNotesProcessorService { if (fs.existsSync(outputPath + '/media_attachments/files') && mastoFolder) { await this.uploadFiles(outputPath + '/media_attachments/files', user, mastoFolder.id); } - this.queueService.createImportMastoToDbJob(job.data.user, outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note')); + const processedToots = await this.recreateChain(['object', 'id'], ['object', 'inReplyTo'], outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true); + this.queueService.createImportMastoToDbJob(job.data.user, processedToots, null); } } } finally { @@ -289,7 +305,7 @@ export class ImportNotesProcessorService { const notesJson = fs.readFileSync(path, 'utf-8'); const notes = JSON.parse(notesJson); - const processedNotes = await this.recreateChain('id', 'replyId', notes); + const processedNotes = await this.recreateChain(['id'], ['replyId'], notes, false); this.queueService.createImportKeyNotesToDbJob(job.data.user, processedNotes, null); cleanup(); } @@ -298,7 +314,7 @@ export class ImportNotesProcessorService { } @bindThis - public async processKeyNotesToDb(job: Bull.Job): Promise { + public async processKeyNotesToDb(job: Bull.Job): Promise { const note = job.data.target; const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { @@ -355,28 +371,33 @@ export class ImportNotesProcessorService { } @bindThis - public async processMastoToDb(job: Bull.Job): Promise { + public async processMastoToDb(job: Bull.Job): Promise { const toot = job.data.target; const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { return; } + if (toot.directMessage) return; + const date = new Date(toot.object.published); let text = undefined; const files: MiDriveFile[] = []; let reply: MiNote | null = null; if (toot.object.inReplyTo != null) { - try { - reply = await this.apNoteService.resolveNote(toot.object.inReplyTo); - } catch (error) { - reply = null; + const parentNote = job.data.note ? await this.notesRepository.findOneBy({ id: job.data.note }) : null; + if (parentNote) { + reply = parentNote; + } else { + try { + reply = await this.apNoteService.resolveNote(toot.object.inReplyTo); + } catch (error) { + reply = null; + } } } - if (toot.directMessage) return; - const hashtags = extractApHashtagObjects(toot.object.tag).map((x) => x.name).filter((x): x is string => x != null); try { @@ -396,17 +417,20 @@ export class ImportNotesProcessorService { } } - await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: toot.object.sensitive ? toot.object.summary : null, reply: reply }); + const createdNote = await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: toot.object.sensitive ? toot.object.summary : null, reply: reply }); + if (toot.childNotes) this.queueService.createImportMastoToDbJob(user, toot.childNotes, createdNote.id); } @bindThis - public async processPleroToDb(job: Bull.Job): Promise { + public async processPleroToDb(job: Bull.Job): Promise { const post = job.data.target; const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { return; } + if (post.directMessage) return; + const date = new Date(post.object.published); let text = undefined; const files: MiDriveFile[] = []; @@ -416,15 +440,18 @@ export class ImportNotesProcessorService { if (folder == null) return; if (post.object.inReplyTo != null) { - try { - reply = await this.apNoteService.resolveNote(post.object.inReplyTo); - } catch (error) { - reply = null; + const parentNote = job.data.note ? await this.notesRepository.findOneBy({ id: job.data.note }) : null; + if (parentNote) { + reply = parentNote; + } else { + try { + reply = await this.apNoteService.resolveNote(post.object.inReplyTo); + } catch (error) { + reply = null; + } } } - if (post.directMessage) return; - const hashtags = extractApHashtagObjects(post.object.tag).map((x) => x.name).filter((x): x is string => x != null); try { @@ -468,7 +495,8 @@ export class ImportNotesProcessorService { } } - await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: post.object.sensitive ? post.object.summary : null, reply: reply }); + const createdNote = await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: post.object.sensitive ? post.object.summary : null, reply: reply }); + if (post.childNotes) this.queueService.createImportPleroToDbJob(user, post.childNotes, createdNote.id); } @bindThis @@ -517,7 +545,7 @@ export class ImportNotesProcessorService { } @bindThis - public async processTwitterDb(job: Bull.Job): Promise { + public async processTwitterDb(job: Bull.Job): Promise { const tweet = job.data.target; const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 8d09e4e197..432b3d364f 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -50,12 +50,12 @@ export type DbJobMap = { exportUserLists: DbJobDataWithUser; importAntennas: DBAntennaImportJobData; importNotes: DbNoteImportJobData; - importTweetsToDb: DbKeyNoteImportToDbJobData; + importTweetsToDb: DbNoteWithParentImportToDbJobData; importIGToDb: DbNoteImportToDbJobData; importFBToDb: DbNoteImportToDbJobData; - importMastoToDb: DbNoteImportToDbJobData; - importPleroToDb: DbNoteImportToDbJobData; - importKeyNotesToDb: DbKeyNoteImportToDbJobData; + importMastoToDb: DbNoteWithParentImportToDbJobData; + importPleroToDb: DbNoteWithParentImportToDbJobData; + importKeyNotesToDb: DbNoteWithParentImportToDbJobData; importFollowing: DbUserImportJobData; importFollowingToDb: DbUserImportToDbJobData; importMuting: DbUserImportJobData; @@ -113,7 +113,7 @@ export type DbNoteImportToDbJobData = { target: any; }; -export type DbKeyNoteImportToDbJobData = { +export type DbNoteWithParentImportToDbJobData = { user: ThinUser; target: any; note: MiNote['id'] | null;