Re-implement sonic

This commit is contained in:
thatonecalculator 2022-02-10 19:57:27 -08:00
parent 103655db0b
commit 2ce2f4cd95
9 changed files with 359 additions and 108 deletions

View file

@ -39,7 +39,8 @@
"gulp-rename": "2.0.0",
"gulp-replace": "1.1.3",
"gulp-terser": "2.1.0",
"js-yaml": "4.1.0"
"js-yaml": "4.1.0",
"sonic-channel": "1.2.6"
},
"devDependencies": {
"@redocly/openapi-core": "1.0.0-beta.79",

View file

@ -0,0 +1,33 @@
declare module 'sonic-channel' {
type ConnectionListeners = {
connected?: () => void,
disconnected?: (err: Error | null) => void,
timeout?: () => void,
retrying?: () => void,
error?: (err: Error) => void,
};
type ConnectionParams = {
host: string,
port: number,
auth: string | null
};
class Ingest {
constructor(options: ConnectionParams);
public connect(handlers: ConnectionListeners): Ingest;
public close(): Promise<void>;
public push(collection_id: string, bucket_id: string, object_id: string, text: string, lang?: string): Promise<void>;
}
class Search {
constructor(options: ConnectionParams);
public connect(handlers: ConnectionListeners): Search;
public close(): Promise<void>;
public query(collection_id: string, bucket_id: string, terms_text: string, limit?: number, offset?: number, lang?: string): Promise<string[]>;
}
}

View file

@ -24,7 +24,7 @@ export type Source = {
db?: number;
prefix?: string;
};
elasticsearch: {
elasticsearch?: {
host: string;
port: number;
ssl?: boolean;
@ -32,6 +32,12 @@ export type Source = {
pass?: string;
index?: string;
};
sonic?: {
host: string;
port: number;
pass: string;
index?: string;
};
proxy?: string;
proxySmtp?: string;

View file

@ -0,0 +1,3 @@
import { EventEmitter } from 'events';
export class SearchClientBase extends EventEmitter { }

View file

@ -1,7 +1,9 @@
import * as elasticsearch from '@elastic/elasticsearch';
import config from '@/config/index';
import config from '../config';
import { SearchClientBase } from './SearchClientBase';
import { Note } from '../models/entities/note';
const index = {
const indexData = {
settings: {
analysis: {
analyzer: {
@ -30,27 +32,107 @@ const index = {
},
};
// Init ElasticSearch connection
const client = config.elasticsearch ? new elasticsearch.Client({
node: `${config.elasticsearch.ssl ? 'https://' : 'http://'}${config.elasticsearch.host}:${config.elasticsearch.port}`,
auth: (config.elasticsearch.user && config.elasticsearch.pass) ? {
username: config.elasticsearch.user,
password: config.elasticsearch.pass,
} : undefined,
pingTimeout: 30000,
}) : null;
class ElasticSearch extends SearchClientBase {
public index = 'misskey_note';
constructor(address: string, index?: string) {
super();
this.index = index || 'misskey_note';
// Init ElasticSearch connection
this._client = new elasticsearch.Client({
node: address,
pingTimeout: 30000,
});
if (client) {
client.indices.exists({
index: config.elasticsearch.index || 'misskey_note',
}).then(exist => {
if (!exist.body) {
client.indices.create({
index: config.elasticsearch.index || 'misskey_note',
body: index,
this._client.indices
.exists({
index: this.index,
})
.then(exist => {
if (!exist.body) {
this._client.indices.create({
index: this.index,
body: indexData,
});
}
});
}
private _client: elasticsearch.Client;
public available = true;
public search(
content: string,
qualifiers: {userId?: string | null; userHost?: string | null} = {},
limit?: number,
offset?: number,
) {
const queries: any[] = [
{
simple_query_string: {
fields: ['text'],
query: content.toLowerCase(),
default_operator: 'and',
},
},
];
if (qualifiers.userId) {
queries.push({
term: { userId: qualifiers.userId },
});
} else if (qualifiers.userHost !== undefined) {
if (qualifiers.userHost === null) {
queries.push({
bool: {
must_not: {
exists: {
field: 'userHost',
},
},
},
});
} else {
queries.push({
term: {
userHost: qualifiers.userHost,
},
});
}
}
});
return this._client
.search({
index: this.index,
body: {
size: limit,
from: offset,
query: {
bool: {
must: queries,
},
},
},
})
.then(result => result.body.hits.hits.map((hit: any) => hit._id));
}
public push(note: Note) {
return this._client.index({
index: this.index,
id: note.id.toString(),
body: {
text: String(note.text).toLowerCase(),
userId: note.userId,
userHost: note.userHost,
},
});
}
}
export default client;
export default (config.elasticsearch
? new ElasticSearch(
`${config.elasticsearch.ssl ? 'https://' : 'http://'}${config.elasticsearch.host}:${config.elasticsearch.port}`,
config.elasticsearch.index,
)
: null);

View file

@ -0,0 +1,11 @@
import sonic from './sonic';
import es from './elasticsearch';
// This file is just to make it easier to add new drivers in the future, simply import searchClient and whatever driver is available is used
export const clients = [sonic, es];
const client = clients.find(client => client && client.available) || null;
export default client;

View file

@ -0,0 +1,158 @@
import * as Sonic from 'sonic-channel';
import config from '../config';
import { SearchClientBase } from './SearchClientBase';
import { Note } from '../models/entities/note';
export class SonicDriver extends SearchClientBase {
public available = true;
public index = 'misskey_note';
public locale = 'none';
public _ingestQueue: (() => Promise<void>)[] = [];
public _searchQueue: (() => Promise<void>)[] = [];
public _searchReady = false;
public _ingestReady = false;
public _ingestClient: Sonic.Ingest;
public _searchClient: Sonic.Search;
constructor(connectionArgs: {
host: string;
port: number;
auth: string | null;
}, index?: string) {
super();
// Bad!
const self = this;
this.index = index || 'misskey_note';
this._ingestClient = new Sonic.Ingest(connectionArgs).connect({
connected() {
// execute queue of queries
self._runIngestQueue();
self._ingestReady = true;
self._emitReady();
},
disconnected() {
self._ingestReady = false;
self.emit('disconnected');
},
timeout() { },
retrying() { },
error(err: Error) {
self.emit('error', err);
},
});
self._searchClient = new Sonic.Search(connectionArgs).connect({
connected() {
// execute queue of queries
self._runSearchQueue();
self._searchReady = true;
self._emitReady();
},
disconnected() {
self._searchReady = false;
self.emit('disconnected');
},
timeout() { },
retrying() { },
error(err: Error) {
self.emit('error', err);
},
});
}
get ready() {
return this._searchReady && this._ingestReady;
}
public _emitReady() {
if (this.ready) this.emit('ready');
}
public async disconnect() {
return await Promise.all([
this._searchClient.close(),
this._ingestClient.close(),
]);
}
public search(
content: string,
qualifiers: { userId?: string | null; userHost?: string | null } = {},
limit: number = 20,
offset?: number,
locale?: string,
) {
const doSearch = () =>
this._searchClient.query(
this.index,
pickQualifier(qualifiers),
content,
limit,
offset,
locale || this.locale,
);
if (this._searchReady) {
return doSearch();
} else {
return new Promise((resolve, reject) => {
this._searchQueue.push(() =>
doSearch()
.then(resolve)
.catch(reject),
);
});
}
}
public push(note: Note) {
const doIngest = () => {
return Promise.all(
['userId-' + note.userId, 'userHost-' + note.userHost, 'default']
.map((bucket: string) =>
this._ingestClient.push(
this.index,
bucket,
note.id,
String(note.text).toLowerCase(),
this.locale,
),
),
);
};
if (this._ingestReady) {
return doIngest();
} else {
return new Promise((resolve, reject) => {
this._ingestQueue.push(() =>
doIngest()
.then(resolve)
.catch(reject),
);
});
}
}
public _runIngestQueue() {
return Promise.all(this._ingestQueue.map(cb => cb()));
}
public _runSearchQueue() {
return Promise.all(this._searchQueue.map(cb => cb()));
}
}
function pickQualifier(qualifiers: { userId?: string | null; userHost?: string | null }) {
if (qualifiers.userId) return 'userId-' + qualifiers.userId;
else if (qualifiers.userHost) return 'userHost-' + qualifiers.userHost;
else return 'default';
}
export default (config.sonic
? new SonicDriver({
host: config.sonic.host,
port: config.sonic.port,
auth: config.sonic.pass == undefined ? null : config.sonic.pass
}, config.sonic.index)
: null);

View file

@ -1,17 +1,17 @@
import $ from 'cafy';
import es from '../../../../db/elasticsearch';
import define from '../../define';
import { Notes } from '@/models/index';
import { In } from 'typeorm';
import { ID } from '@/misc/cafy-id';
import config from '@/config/index';
import { makePaginationQuery } from '../../common/make-pagination-query';
import { generateVisibilityQuery } from '../../common/generate-visibility-query';
import { generateMutedUserQuery } from '../../common/generate-muted-user-query';
import { generateBlockedUserQuery } from '../../common/generate-block-query';
import $ from "cafy";
import searchClient from "../../../../db/searchClient";
import define from "../../define";
import { Notes } from "@/models/index";
import { In } from "typeorm";
import { ID } from "@/misc/cafy-id";
import config from "@/config/index";
import { makePaginationQuery } from "../../common/make-pagination-query";
import { generateVisibilityQuery } from "../../common/generate-visibility-query";
import { generateMutedUserQuery } from "../../common/generate-muted-user-query";
import { generateBlockedUserQuery } from "../../common/generate-block-query";
export const meta = {
tags: ['notes'],
tags: ["notes"],
requireCredential: false,
@ -50,37 +50,44 @@ export const meta = {
},
res: {
type: 'array',
optional: false, nullable: false,
type: "array",
optional: false,
nullable: false,
items: {
type: 'object',
optional: false, nullable: false,
ref: 'Note',
type: "object",
optional: false,
nullable: false,
ref: "Note",
},
},
errors: {
},
errors: {},
} as const;
// eslint-disable-next-line import/no-default-export
export default define(meta, async (ps, me) => {
if (es == null) {
const query = makePaginationQuery(Notes.createQueryBuilder('note'), ps.sinceId, ps.untilId);
if (searchClient == null) {
const query = makePaginationQuery(
Notes.createQueryBuilder("note"),
ps.sinceId,
ps.untilId
);
if (ps.userId) {
query.andWhere('note.userId = :userId', { userId: ps.userId });
query.andWhere("note.userId = :userId", { userId: ps.userId });
} else if (ps.channelId) {
query.andWhere('note.channelId = :channelId', { channelId: ps.channelId });
query.andWhere("note.channelId = :channelId", {
channelId: ps.channelId,
});
}
query
.andWhere('note.text ILIKE :q', { q: `%${ps.query}%` })
.innerJoinAndSelect('note.user', 'user')
.leftJoinAndSelect('note.reply', 'reply')
.leftJoinAndSelect('note.renote', 'renote')
.leftJoinAndSelect('reply.user', 'replyUser')
.leftJoinAndSelect('renote.user', 'renoteUser');
.andWhere("note.text ILIKE :q", { q: `%${ps.query}%` })
.innerJoinAndSelect("note.user", "user")
.leftJoinAndSelect("note.reply", "reply")
.leftJoinAndSelect("note.renote", "renote")
.leftJoinAndSelect("reply.user", "replyUser")
.leftJoinAndSelect("renote.user", "renoteUser");
generateVisibilityQuery(query, me);
if (me) generateMutedUserQuery(query, me);
@ -90,55 +97,13 @@ export default define(meta, async (ps, me) => {
return await Notes.packMany(notes, me);
} else {
const userQuery = ps.userId != null ? [{
term: {
userId: ps.userId,
},
}] : [];
const hostQuery = ps.userId == null ?
ps.host === null ? [{
bool: {
must_not: {
exists: {
field: 'userHost',
},
},
},
}] : ps.host !== undefined ? [{
term: {
userHost: ps.host,
},
}] : []
: [];
const result = await es.search({
index: config.elasticsearch.index || 'misskey_note',
body: {
size: ps.limit!,
from: ps.offset,
query: {
bool: {
must: [{
simple_query_string: {
fields: ['text'],
query: ps.query.toLowerCase(),
default_operator: 'and',
},
}, ...hostQuery, ...userQuery],
},
},
sort: [{
_doc: 'desc',
}],
},
const hits = await searchClient.search(ps.query, {
userHost: ps.host,
userId: ps.userId,
});
const hits = result.body.hits.hits.map((hit: any) => hit._id);
if (hits.length === 0) return [];
// Fetch found notes
const notes = await Notes.find({
where: {
id: In(hits),

View file

@ -1,5 +1,5 @@
import * as mfm from 'mfm-js';
import es from '../../db/elasticsearch';
import searchClient from "../../db/searchClient";
import { publishMainStream, publishNotesStream } from '@/services/stream';
import DeliverManager from '@/remote/activitypub/deliver-manager';
import renderNote from '@/remote/activitypub/renderer/note';
@ -552,17 +552,9 @@ async function insertNote(user: { id: User['id']; host: User['host']; }, data: O
}
function index(note: Note) {
if (note.text == null || config.elasticsearch == null) return;
if (note.text == null || searchClient == null) return;
es!.index({
index: config.elasticsearch.index || 'misskey_note',
id: note.id.toString(),
body: {
text: normalizeForSearch(note.text),
userId: note.userId,
userHost: note.userHost,
},
});
return searchClient.push(note);
}
async function notifyToWatchersOfRenotee(renote: Note, user: { id: User['id']; }, nm: NotificationManager, type: NotificationType) {