This commit is contained in:
syuilo 2022-01-18 23:19:33 +09:00
parent a8fad1b61c
commit 51873bbf16

View file

@ -4,8 +4,10 @@
* Tests located in test/chart
*/
import { promisify } from 'util';
import * as nestedProperty from 'nested-property';
import autobind from 'autobind-decorator';
import { redisClient } from '@/db/redis';
import Logger from '../logger';
import { SimpleSchema } from '@/misc/simple-schema';
import { EntitySchema, getRepository, Repository, LessThan, Between } from 'typeorm';
@ -82,11 +84,6 @@ export default abstract class Chart<T extends Record<string, any>> {
columns[this.columnPrefix + p] = {
type: 'bigint',
};
} else if (v.type === 'array' && v.items.type === 'string') {
columns[this.columnPrefix + p] = {
type: 'varchar',
array: true,
};
}
}
};
@ -107,7 +104,7 @@ export default abstract class Chart<T extends Record<string, any>> {
@autobind
private static convertObjectToFlattenColumns(x: Record<string, unknown>) {
const columns = {} as Record<string, number | unknown[]>;
const columns = {} as Record<string, unknown>;
const flatten = (x: Obj, path?: string) => {
for (const [k, v] of Object.entries(x)) {
const p = path ? `${path}${this.columnDot}${k}` : k;
@ -129,8 +126,6 @@ export default abstract class Chart<T extends Record<string, any>> {
for (const [k, v] of Object.entries(x)) {
if (typeof v === 'object' && !Array.isArray(v)) {
res[k] = exec(v);
} else if (Array.isArray(v)) {
res[k] = Array.from(new Set(v)).length;
} else {
res[k] = v;
}
@ -141,19 +136,12 @@ export default abstract class Chart<T extends Record<string, any>> {
}
@autobind
private static convertQuery(diff: Record<string, number | unknown[]>) {
private static convertQuery(diff: Record<string, number>) {
const query: Record<string, () => string> = {};
for (const [k, v] of Object.entries(diff)) {
if (typeof v === 'number') {
if (v > 0) query[k] = () => `"${k}" + ${v}`;
if (v < 0) query[k] = () => `"${k}" - ${Math.abs(v)}`;
} else if (Array.isArray(v)) {
// TODO: item が文字列以外の場合も対応
// TODO: item をSQLエスケープ
const items = v.map(item => `"${item}"`).join(',');
query[k] = () => `array_cat("${k}", '{${items}}'::varchar[])`;
}
if (v > 0) query[k] = () => `"${k}" + ${v}`;
if (v < 0) query[k] = () => `"${k}" - ${Math.abs(v)}`;
}
return query;
@ -368,13 +356,59 @@ export default abstract class Chart<T extends Record<string, any>> {
});
}
@autobind
protected commitUnique(diff: DeepPartial<T>, group: string | null = null): void {
const [y, m, d, h] = Chart.getCurrentDate();
const currentHour = dateUTC([y, m, d, h]);
const currentDay = dateUTC([y, m, d]);
const diffFlat = Chart.convertObjectToFlattenColumns(diff);
for (const [k, v] of Object.entries(diffFlat)) {
const keyHour = `chart_unique_buffer:${this.name}:${k}:${group || ''}:hour:${currentHour.getTime()}`;
const keyDay = `chart_unique_buffer:${this.name}:${k}:${group || ''}:day:${currentDay.getTime()}`;
redisClient.sadd(keyHour, v as string);
redisClient.sadd(keyDay, v as string);
redisClient.sadd(`chart_unique_buffer_index:${this.name}:hour:${currentHour.getTime()}`, keyHour);
redisClient.sadd(`chart_unique_buffer_index:${this.name}:day:${currentDay.getTime()}`, keyDay);
}
}
@autobind
public async save(): Promise<void> {
if (this.buffer.length === 0) {
logger.info(`${this.name}: Write skipped`);
// TODO: 現在のrangeのひとつ前のrangeもbake(claimPreviousLog?)
const [y, m, d, h] = Chart.getCurrentDate();
const currentHour = dateUTC([y, m, d, h]);
const currentDay = dateUTC([y, m, d]);
const smembers = promisify(redisClient.smembers).bind(redisClient);
const uniqueBufferKeysHour = await smembers(`chart_unique_buffer_index:${this.name}:hour:${currentHour.getTime()}`);
const uniqueBufferKeysDay = await smembers(`chart_unique_buffer_index:${this.name}:day:${currentDay.getTime()}`);
if (this.buffer.length === 0 && uniqueBufferKeysHour.length === 0 && uniqueBufferKeysDay.length === 9) {
logger.info(`${this.name}: No commits. Write skipped`);
return;
}
const bakeUniqueBuffer = async (logHour: Log, logDay: Log): Promise<void> => {
// この方法だと、100種類のハッシュタグが使われたらbakeごとに100のクエリがredisに送信されることになりそう
// ログ更新
await Promise.all([
this.repositoryForHour.createQueryBuilder()
.update()
.set(query)
.where('id = :id', { id: logHour.id })
.execute(),
this.repositoryForDay.createQueryBuilder()
.update()
.set(query)
.where('id = :id', { id: logDay.id })
.execute(),
]);
};
// TODO: 前の時間のログがbufferにあった場合のハンドリング
// 例えば、save が20分ごとに行われるとして、前回行われたのは 01:50 だったとする。
// 次に save が行われるのは 02:10 ということになるが、もし 01:55 に新規ログが buffer に追加されたとすると、
@ -429,8 +463,10 @@ export default abstract class Chart<T extends Record<string, any>> {
Promise.all([
this.claimCurrentLog(group, 'hour'),
this.claimCurrentLog(group, 'day'),
]).then(([logHour, logDay]) =>
update(logHour, logDay))));
]).then(([logHour, logDay]) => {
update(logHour, logDay);
bakeUniqueBuffer(logHour, logDay);
})));
}
@autobind