diff --git a/src/misc/antenna-cache.ts b/src/misc/antenna-cache.ts new file mode 100644 index 0000000000..6be1c18db5 --- /dev/null +++ b/src/misc/antenna-cache.ts @@ -0,0 +1,36 @@ +import { Antennas } from '../models'; +import { Antenna } from '../models/entities/antenna'; +import { subsdcriber } from '../db/redis'; + +let antennasFetched = false; +let antennas: Antenna[] = []; + +export async function getAntennas() { + if (!antennasFetched) { + antennas = await Antennas.find(); + antennasFetched = true; + } + + return antennas; +} + +subsdcriber.on('message', async (_, data) => { + const obj = JSON.parse(data); + + if (obj.channel === 'internal') { + const { type, body } = obj.message; + switch (type) { + case 'antennaCreated': + antennas.push(body); + break; + case 'antennaUpdated': + antennas[antennas.findIndex(a => a.id === body.id)] = body; + break; + case 'antennaDeleted': + antennas = antennas.filter(a => a.id !== body.id); + break; + default: + break; + } + } +}); diff --git a/src/misc/check-hit-antenna.ts b/src/misc/check-hit-antenna.ts index 0464f14131..9897dd414e 100644 --- a/src/misc/check-hit-antenna.ts +++ b/src/misc/check-hit-antenna.ts @@ -4,18 +4,24 @@ import { User } from '../models/entities/user'; import { UserListJoinings, UserGroupJoinings } from '../models'; import parseAcct from './acct/parse'; import { getFullApAccount } from './convert-host'; +import { PackedNote } from '../models/repositories/note'; -export async function checkHitAntenna(antenna: Antenna, note: Note, noteUser: User, followers: User['id'][]): Promise { +/** + * noteUserFollowers / antennaUserFollowing はどちらか一方が指定されていればよい + */ +export async function checkHitAntenna(antenna: Antenna, note: (Note | PackedNote), noteUser: { username: string; host: string | null; }, noteUserFollowers?: User['id'][], antennaUserFollowing?: User['id'][]): Promise { if (note.visibility === 'specified') return false; if (note.visibility === 'followers') { - if (!followers.includes(antenna.userId)) return false; + if (noteUserFollowers && !noteUserFollowers.includes(antenna.userId)) return false; + if (antennaUserFollowing && !antennaUserFollowing.includes(note.userId)) return false; } if (!antenna.withReplies && note.replyId != null) return false; if (antenna.src === 'home') { - if (!followers.includes(antenna.userId)) return false; + if (noteUserFollowers && !noteUserFollowers.includes(antenna.userId)) return false; + if (antennaUserFollowing && !antennaUserFollowing.includes(note.userId)) return false; } else if (antenna.src === 'list') { const listUsers = (await UserListJoinings.find({ userListId: antenna.userListId! @@ -75,7 +81,7 @@ export async function checkHitAntenna(antenna: Antenna, note: Note, noteUser: Us } if (antenna.withFile) { - if (note.fileIds.length === 0) return false; + if (note.fileIds && note.fileIds.length === 0) return false; } // TODO: eval expression diff --git a/src/models/repositories/user.ts b/src/models/repositories/user.ts index 53c06f3f16..3d897e1663 100644 --- a/src/models/repositories/user.ts +++ b/src/models/repositories/user.ts @@ -6,6 +6,7 @@ import config from '../../config'; import { SchemaType } from '../../misc/schema'; import { awaitAll } from '../../prelude/await-all'; import { populateEmojis } from '../../misc/populate-emojis'; +import { getAntennas } from '../../misc/antenna-cache'; export type PackedUser = SchemaType; @@ -97,10 +98,10 @@ export class UserRepository extends Repository { } public async getHasUnreadAntenna(userId: User['id']): Promise { - const antennas = await Antennas.find({ userId }); + const myAntennas = (await getAntennas()).filter(a => a.userId === userId); - const unread = antennas.length > 0 ? await AntennaNotes.findOne({ - antennaId: In(antennas.map(x => x.id)), + const unread = myAntennas.length > 0 ? await AntennaNotes.findOne({ + antennaId: In(myAntennas.map(x => x.id)), read: false }) : null; diff --git a/src/server/api/endpoints/antennas/create.ts b/src/server/api/endpoints/antennas/create.ts index b5f0eead2a..7210c992cd 100644 --- a/src/server/api/endpoints/antennas/create.ts +++ b/src/server/api/endpoints/antennas/create.ts @@ -4,6 +4,7 @@ import { genId } from '../../../../misc/gen-id'; import { Antennas, UserLists, UserGroupJoinings } from '../../../../models'; import { ID } from '../../../../misc/cafy-id'; import { ApiError } from '../../error'; +import { publishInternalEvent } from '../../../../services/stream'; export const meta = { desc: { @@ -108,7 +109,7 @@ export default define(meta, async (ps, user) => { } } - const antenna = await Antennas.save({ + const antenna = await Antennas.insert({ id: genId(), createdAt: new Date(), userId: user.id, @@ -123,7 +124,9 @@ export default define(meta, async (ps, user) => { withReplies: ps.withReplies, withFile: ps.withFile, notify: ps.notify, - }); + }).then(x => Antennas.findOneOrFail(x.identifiers[0])); + + publishInternalEvent('antennaCreated', antenna); return await Antennas.pack(antenna); }); diff --git a/src/server/api/endpoints/antennas/delete.ts b/src/server/api/endpoints/antennas/delete.ts index 9008c37c8e..adaf2d0d85 100644 --- a/src/server/api/endpoints/antennas/delete.ts +++ b/src/server/api/endpoints/antennas/delete.ts @@ -3,6 +3,7 @@ import { ID } from '../../../../misc/cafy-id'; import define from '../../define'; import { ApiError } from '../../error'; import { Antennas } from '../../../../models'; +import { publishInternalEvent } from '../../../../services/stream'; export const meta = { desc: { @@ -42,4 +43,6 @@ export default define(meta, async (ps, user) => { } await Antennas.delete(antenna.id); + + publishInternalEvent('antennaDeleted', antenna); }); diff --git a/src/server/api/endpoints/antennas/update.ts b/src/server/api/endpoints/antennas/update.ts index 58f4e68c89..22764b7411 100644 --- a/src/server/api/endpoints/antennas/update.ts +++ b/src/server/api/endpoints/antennas/update.ts @@ -3,6 +3,7 @@ import { ID } from '../../../../misc/cafy-id'; import define from '../../define'; import { ApiError } from '../../error'; import { Antennas, UserLists, UserGroupJoinings } from '../../../../models'; +import { publishInternalEvent } from '../../../../services/stream'; export const meta = { desc: { @@ -141,5 +142,7 @@ export default define(meta, async (ps, user) => { notify: ps.notify, }); + publishInternalEvent('antennaUpdated', Antennas.findOneOrFail(antenna.id)); + return await Antennas.pack(antenna.id); }); diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index 99ae558696..1dcc31d32f 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -168,17 +168,10 @@ export default class Connection { if (note == null) return; if (this.user && (note.userId !== this.user.id)) { - if (note.mentions && note.mentions.includes(this.user.id)) { - readNote(this.user.id, [note]); - } else if (note.visibleUserIds && note.visibleUserIds.includes(this.user.id)) { - readNote(this.user.id, [note]); - } - - if (this.followingChannels.has(note.channelId)) { - // TODO - } - - // TODO: アンテナの既読処理 + readNote(this.user.id, [note], { + following: this.following, + followingChannels: this.followingChannels, + }); } } diff --git a/src/services/note/create.ts b/src/services/note/create.ts index 4a737e8516..b9c1676c58 100644 --- a/src/services/note/create.ts +++ b/src/services/note/create.ts @@ -33,6 +33,7 @@ import { countSameRenotes } from '../../misc/count-same-renotes'; import { deliverToRelays } from '../relay'; import { Channel } from '../../models/entities/channel'; import { normalizeForSearch } from '../../misc/normalize-for-search'; +import { getAntennas } from '../../misc/antenna-cache'; type NotificationType = 'reply' | 'renote' | 'quote' | 'mention'; @@ -241,6 +242,7 @@ export default async (user: User, data: Option, silent = false) => new Promise { @@ -262,17 +264,15 @@ export default async (user: User, data: Option, silent = false) => new Promise { + .then(async followings => { const followers = followings.map(f => f.followerId); - Antennas.find().then(async antennas => { - for (const antenna of antennas) { - checkHitAntenna(antenna, note, user, followers).then(hit => { - if (hit) { - addNoteToAntenna(antenna, note, user); - } - }); - } - }); + for (const antenna of (await getAntennas())) { + checkHitAntenna(antenna, note, user, followers).then(hit => { + if (hit) { + addNoteToAntenna(antenna, note, user); + } + }); + } }); // Channel diff --git a/src/services/note/read.ts b/src/services/note/read.ts index 35279db411..91b523bd1d 100644 --- a/src/services/note/read.ts +++ b/src/services/note/read.ts @@ -1,23 +1,59 @@ import { publishMainStream } from '../stream'; import { Note } from '../../models/entities/note'; import { User } from '../../models/entities/user'; -import { NoteUnreads, Antennas, AntennaNotes, Users } from '../../models'; +import { NoteUnreads, AntennaNotes, Users } from '../../models'; import { Not, IsNull, In } from 'typeorm'; +import { Channel } from '../../models/entities/channel'; +import { checkHitAntenna } from '../../misc/check-hit-antenna'; +import { getAntennas } from '../../misc/antenna-cache'; +import { PackedNote } from '../../models/repositories/note'; /** * Mark notes as read */ export default async function( userId: User['id'], - noteIds: Note['id'][] + notes: (Note | PackedNote)[], + info: { + following: Set; + followingChannels: Set; + } ) { - async function careNoteUnreads() { + const myAntennas = (await getAntennas()).filter(a => a.userId === userId); + const readMentions: (Note | PackedNote)[] = []; + const readSpecifiedNotes: (Note | PackedNote)[] = []; + const readChannelNotes: (Note | PackedNote)[] = []; + const readAntennaNotes: (Note | PackedNote)[] = []; + + for (const note of notes) { + if (note.mentions && note.mentions.includes(userId)) { + readMentions.push(note); + } else if (note.visibleUserIds && note.visibleUserIds.includes(userId)) { + readSpecifiedNotes.push(note); + } + + if (note.channelId && info.followingChannels.has(note.channelId)) { + readChannelNotes.push(note); + } + + if (note.user != null) { // たぶんnullになることは無いはずだけど一応 + for (const antenna of myAntennas) { + if (checkHitAntenna(antenna, note, note.user as any, undefined, Array.from(info.following))) { + readAntennaNotes.push(note); + } + } + } + } + + if ((readMentions.length > 0) || (readSpecifiedNotes.length > 0) || (readChannelNotes.length > 0)) { // Remove the record await NoteUnreads.delete({ userId: userId, - noteId: In(noteIds), + noteId: In([...readMentions.map(n => n.id), ...readSpecifiedNotes.map(n => n.id), ...readChannelNotes.map(n => n.id)]), }); + // TODO: ↓まとめてクエリしたい + NoteUnreads.count({ userId: userId, isMentioned: true @@ -49,33 +85,25 @@ export default async function( }); } - async function careAntenna() { - const antennas = await Antennas.find({ userId }); + if (readAntennaNotes.length > 0) { + await AntennaNotes.update({ + antennaId: In(myAntennas.map(a => a.id)), + noteId: In(readAntennaNotes.map(n => n.id)) + }, { + read: true + }); - await Promise.all(antennas.map(async antenna => { - const countBefore = await AntennaNotes.count({ + // TODO: まとめてクエリしたい + for (const antenna of myAntennas) { + const count = await AntennaNotes.count({ antennaId: antenna.id, read: false }); - if (countBefore === 0) return; - - await AntennaNotes.update({ - antennaId: antenna.id, - noteId: In(noteIds) - }, { - read: true - }); - - const countAfter = await AntennaNotes.count({ - antennaId: antenna.id, - read: false - }); - - if (countAfter === 0) { + if (count === 0) { publishMainStream(userId, 'readAntenna', antenna); } - })); + } Users.getHasUnreadAntenna(userId).then(unread => { if (!unread) { @@ -83,7 +111,4 @@ export default async function( } }); } - - careNoteUnreads(); - careAntenna(); } diff --git a/src/services/stream.ts b/src/services/stream.ts index a8855b62e6..6d7c2c4922 100644 --- a/src/services/stream.ts +++ b/src/services/stream.ts @@ -20,6 +20,10 @@ class Publisher { })); } + public publishInternalEvent = (type: string, value?: any): void => { + this.publish('internal', type, typeof value === 'undefined' ? null : value); + } + public publishUserEvent = (userId: User['id'], type: string, value?: any): void => { this.publish(`user:${userId}`, type, typeof value === 'undefined' ? null : value); } @@ -88,6 +92,7 @@ const publisher = new Publisher(); export default publisher; +export const publishInternalEvent = publisher.publishInternalEvent; export const publishUserEvent = publisher.publishUserEvent; export const publishBroadcastStream = publisher.publishBroadcastStream; export const publishMainStream = publisher.publishMainStream;