Compare commits
2 commits
Author | SHA1 | Date | |
---|---|---|---|
|
ce588ac8df | ||
|
51873bbf16 |
1 changed files with 58 additions and 22 deletions
|
@ -4,8 +4,10 @@
|
|||
* Tests located in test/chart
|
||||
*/
|
||||
|
||||
import { promisify } from 'util';
|
||||
import * as nestedProperty from 'nested-property';
|
||||
import autobind from 'autobind-decorator';
|
||||
import { redisClient } from '@/db/redis';
|
||||
import Logger from '../logger';
|
||||
import { Schema } from '@/misc/schema';
|
||||
import { EntitySchema, getRepository, Repository, LessThan, Between } from 'typeorm';
|
||||
|
@ -82,11 +84,6 @@ export default abstract class Chart<T extends Record<string, any>> {
|
|||
columns[this.columnPrefix + p] = {
|
||||
type: 'bigint',
|
||||
};
|
||||
} else if (v.type === 'array' && v.items.type === 'string') {
|
||||
columns[this.columnPrefix + p] = {
|
||||
type: 'varchar',
|
||||
array: true,
|
||||
};
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -107,7 +104,7 @@ export default abstract class Chart<T extends Record<string, any>> {
|
|||
|
||||
@autobind
|
||||
private static convertObjectToFlattenColumns(x: Record<string, unknown>) {
|
||||
const columns = {} as Record<string, number | unknown[]>;
|
||||
const columns = {} as Record<string, unknown>;
|
||||
const flatten = (x: Obj, path?: string) => {
|
||||
for (const [k, v] of Object.entries(x)) {
|
||||
const p = path ? `${path}${this.columnDot}${k}` : k;
|
||||
|
@ -129,8 +126,6 @@ export default abstract class Chart<T extends Record<string, any>> {
|
|||
for (const [k, v] of Object.entries(x)) {
|
||||
if (typeof v === 'object' && !Array.isArray(v)) {
|
||||
res[k] = exec(v);
|
||||
} else if (Array.isArray(v)) {
|
||||
res[k] = Array.from(new Set(v)).length;
|
||||
} else {
|
||||
res[k] = v;
|
||||
}
|
||||
|
@ -141,19 +136,12 @@ export default abstract class Chart<T extends Record<string, any>> {
|
|||
}
|
||||
|
||||
@autobind
|
||||
private static convertQuery(diff: Record<string, number | unknown[]>) {
|
||||
private static convertQuery(diff: Record<string, number>) {
|
||||
const query: Record<string, () => string> = {};
|
||||
|
||||
for (const [k, v] of Object.entries(diff)) {
|
||||
if (typeof v === 'number') {
|
||||
if (v > 0) query[k] = () => `"${k}" + ${v}`;
|
||||
if (v < 0) query[k] = () => `"${k}" - ${Math.abs(v)}`;
|
||||
} else if (Array.isArray(v)) {
|
||||
// TODO: item が文字列以外の場合も対応
|
||||
// TODO: item をSQLエスケープ
|
||||
const items = v.map(item => `"${item}"`).join(',');
|
||||
query[k] = () => `array_cat("${k}", '{${items}}'::varchar[])`;
|
||||
}
|
||||
if (v > 0) query[k] = () => `"${k}" + ${v}`;
|
||||
if (v < 0) query[k] = () => `"${k}" - ${Math.abs(v)}`;
|
||||
}
|
||||
|
||||
return query;
|
||||
|
@ -368,13 +356,59 @@ export default abstract class Chart<T extends Record<string, any>> {
|
|||
});
|
||||
}
|
||||
|
||||
@autobind
|
||||
protected commitUnique(diff: DeepPartial<T>, group: string | null = null): void {
|
||||
const [y, m, d, h] = Chart.getCurrentDate();
|
||||
|
||||
const currentHour = dateUTC([y, m, d, h]);
|
||||
const currentDay = dateUTC([y, m, d]);
|
||||
|
||||
const diffFlat = Chart.convertObjectToFlattenColumns(diff);
|
||||
for (const [k, v] of Object.entries(diffFlat)) {
|
||||
const keyHour = `chart_unique_buffer:${this.name}:${k}:${group || ''}:hour:${currentHour.getTime()}`;
|
||||
const keyDay = `chart_unique_buffer:${this.name}:${k}:${group || ''}:day:${currentDay.getTime()}`;
|
||||
redisClient.sadd(keyHour, v as string);
|
||||
redisClient.sadd(keyDay, v as string);
|
||||
redisClient.sadd(`chart_unique_buffer_index:${this.name}:hour:${currentHour.getTime()}`, keyHour);
|
||||
redisClient.sadd(`chart_unique_buffer_index:${this.name}:day:${currentDay.getTime()}`, keyDay);
|
||||
}
|
||||
}
|
||||
|
||||
@autobind
|
||||
public async save(): Promise<void> {
|
||||
if (this.buffer.length === 0) {
|
||||
logger.info(`${this.name}: Write skipped`);
|
||||
// TODO: 現在のrangeのひとつ前のrangeもbake(claimPreviousLog?)
|
||||
|
||||
const [y, m, d, h] = Chart.getCurrentDate();
|
||||
|
||||
const currentHour = dateUTC([y, m, d, h]);
|
||||
const currentDay = dateUTC([y, m, d]);
|
||||
|
||||
const smembers = promisify(redisClient.smembers).bind(redisClient);
|
||||
const uniqueBufferKeysHour = await smembers(`chart_unique_buffer_index:${this.name}:hour:${currentHour.getTime()}`);
|
||||
const uniqueBufferKeysDay = await smembers(`chart_unique_buffer_index:${this.name}:day:${currentDay.getTime()}`);
|
||||
if (this.buffer.length === 0 && uniqueBufferKeysHour.length === 0 && uniqueBufferKeysDay.length === 9) {
|
||||
logger.info(`${this.name}: No commits. Write skipped`);
|
||||
return;
|
||||
}
|
||||
|
||||
const bakeUniqueBuffer = async (logHour: Log, logDay: Log): Promise<void> => {
|
||||
// この方法だと、100種類のハッシュタグが使われたらbakeごとに100のクエリがredisに送信されることになりそう
|
||||
|
||||
// ログ更新
|
||||
await Promise.all([
|
||||
this.repositoryForHour.createQueryBuilder()
|
||||
.update()
|
||||
.set(query)
|
||||
.where('id = :id', { id: logHour.id })
|
||||
.execute(),
|
||||
this.repositoryForDay.createQueryBuilder()
|
||||
.update()
|
||||
.set(query)
|
||||
.where('id = :id', { id: logDay.id })
|
||||
.execute(),
|
||||
]);
|
||||
};
|
||||
|
||||
// TODO: 前の時間のログがbufferにあった場合のハンドリング
|
||||
// 例えば、save が20分ごとに行われるとして、前回行われたのは 01:50 だったとする。
|
||||
// 次に save が行われるのは 02:10 ということになるが、もし 01:55 に新規ログが buffer に追加されたとすると、
|
||||
|
@ -429,8 +463,10 @@ export default abstract class Chart<T extends Record<string, any>> {
|
|||
Promise.all([
|
||||
this.claimCurrentLog(group, 'hour'),
|
||||
this.claimCurrentLog(group, 'day'),
|
||||
]).then(([logHour, logDay]) =>
|
||||
update(logHour, logDay))));
|
||||
]).then(([logHour, logDay]) => {
|
||||
update(logHour, logDay);
|
||||
bakeUniqueBuffer(logHour, logDay);
|
||||
})));
|
||||
}
|
||||
|
||||
@autobind
|
||||
|
|
Loading…
Reference in a new issue