Improve chart performance (#7360)

* wip

* wip

* wip

* wip

* wip

* Update chart.ts

* wip

* Improve server performance

* wip

* wip
This commit is contained in:
syuilo 2021-03-18 11:17:05 +09:00 committed by GitHub
parent 0d19c2d42e
commit 4f249159d3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 889 additions and 161 deletions

View file

@ -1,5 +1,5 @@
import Xev from 'xev';
import { deliverQueue, inboxQueue } from '../queue';
import { deliverQueue, inboxQueue } from '../queue/queues';
const ev = new Xev();

View file

@ -1,3 +1,7 @@
// https://github.com/typeorm/typeorm/issues/2400
const types = require('pg').types;
types.setTypeParser(20, Number);
import { createConnection, Logger, getConnection } from 'typeorm';
import config from '../config';
import { entities as charts } from '../services/chart/entities';

1
src/global.d.ts vendored Normal file
View file

@ -0,0 +1 @@
type FIXME = any;

View file

@ -0,0 +1,88 @@
// https://gist.github.com/nfantone/1eaa803772025df69d07f4dbf5df7e58
'use strict';
/**
* @callback BeforeShutdownListener
* @param {string} [signalOrEvent] The exit signal or event name received on the process.
*/
/**
* System signals the app will listen to initiate shutdown.
* @const {string[]}
*/
const SHUTDOWN_SIGNALS = ['SIGINT', 'SIGTERM'];
/**
* Time in milliseconds to wait before forcing shutdown.
* @const {number}
*/
const SHUTDOWN_TIMEOUT = 15000;
/**
* A queue of listener callbacks to execute before shutting
* down the process.
* @type {BeforeShutdownListener[]}
*/
const shutdownListeners = [];
/**
* Listen for signals and execute given `fn` function once.
* @param {string[]} signals System signals to listen to.
* @param {function(string)} fn Function to execute on shutdown.
*/
const processOnce = (signals, fn) => {
return signals.forEach(sig => process.once(sig, fn));
};
/**
* Sets a forced shutdown mechanism that will exit the process after `timeout` milliseconds.
* @param {number} timeout Time to wait before forcing shutdown (milliseconds)
*/
const forceExitAfter = timeout => () => {
setTimeout(() => {
// Force shutdown after timeout
console.warn(`Could not close resources gracefully after ${timeout}ms: forcing shutdown`);
return process.exit(1);
}, timeout).unref();
};
/**
* Main process shutdown handler. Will invoke every previously registered async shutdown listener
* in the queue and exit with a code of `0`. Any `Promise` rejections from any listener will
* be logged out as a warning, but won't prevent other callbacks from executing.
* @param {string} signalOrEvent The exit signal or event name received on the process.
*/
async function shutdownHandler(signalOrEvent) {
console.warn(`Shutting down: received [${signalOrEvent}] signal`);
for (const listener of shutdownListeners) {
try {
await listener(signalOrEvent);
} catch (err) {
console.warn(`A shutdown handler failed before completing with: ${err.message || err}`);
}
}
return process.exit(0);
}
/**
* Registers a new shutdown listener to be invoked before exiting
* the main process. Listener handlers are guaranteed to be called in the order
* they were registered.
* @param {BeforeShutdownListener} listener The shutdown listener to register.
* @returns {BeforeShutdownListener} Echoes back the supplied `listener`.
*/
export function beforeShutdown(listener) {
shutdownListeners.push(listener);
return listener;
}
// Register shutdown callback that kills the process after `SHUTDOWN_TIMEOUT` milliseconds
// This prevents custom shutdown handlers from hanging the process indefinitely
processOnce(SHUTDOWN_SIGNALS, forceExitAfter(SHUTDOWN_TIMEOUT));
// Register process shutdown callback
// Will listen to incoming signal events and execute all registered handlers in the stack
processOnce(SHUTDOWN_SIGNALS, shutdownHandler);

View file

@ -1,4 +1,3 @@
import * as Queue from 'bull';
import * as httpSignature from 'http-signature';
import config from '../config';
@ -13,22 +12,7 @@ import { queueLogger } from './logger';
import { DriveFile } from '../models/entities/drive-file';
import { getJobInfo } from './get-job-info';
import { IActivity } from '../remote/activitypub/type';
function initializeQueue(name: string, limitPerSec = -1) {
return new Queue(name, {
redis: {
port: config.redis.port,
host: config.redis.host,
password: config.redis.pass,
db: config.redis.db || 0,
},
prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue',
limiter: limitPerSec > 0 ? {
max: limitPerSec * 5,
duration: 5000
} : undefined
});
}
import { dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues';
export type InboxJobData = {
activity: IActivity,
@ -44,11 +28,6 @@ function renderError(e: Error): any {
};
}
export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128);
export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16);
export const dbQueue = initializeQueue('db');
export const objectStorageQueue = initializeQueue('objectStorage');
const deliverLogger = queueLogger.createSubLogger('deliver');
const inboxLogger = queueLogger.createSubLogger('inbox');
const dbLogger = queueLogger.createSubLogger('db');

18
src/queue/initialize.ts Normal file
View file

@ -0,0 +1,18 @@
import * as Queue from 'bull';
import config from '../config';
export function initialize(name: string, limitPerSec = -1) {
return new Queue(name, {
redis: {
port: config.redis.port,
host: config.redis.host,
password: config.redis.pass,
db: config.redis.db || 0,
},
prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue',
limiter: limitPerSec > 0 ? {
max: limitPerSec * 5,
duration: 5000
} : undefined
});
}

7
src/queue/queues.ts Normal file
View file

@ -0,0 +1,7 @@
import config from '../config';
import { initialize as initializeQueue } from './initialize';
export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128);
export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16);
export const dbQueue = initializeQueue('db');
export const objectStorageQueue = initializeQueue('objectStorage');

View file

@ -17,6 +17,18 @@ export default class ActiveUsersChart extends Chart<ActiveUsersLog> {
return {};
}
@autobind
protected aggregate(logs: ActiveUsersLog[]): ActiveUsersLog {
return {
local: {
users: logs.reduce((a, b) => a.concat(b.local.users), [] as ActiveUsersLog['local']['users']),
},
remote: {
users: logs.reduce((a, b) => a.concat(b.remote.users), [] as ActiveUsersLog['remote']['users']),
},
};
}
@autobind
protected async fetchActual(): Promise<DeepPartial<ActiveUsersLog>> {
return {};
@ -25,11 +37,11 @@ export default class ActiveUsersChart extends Chart<ActiveUsersLog> {
@autobind
public async update(user: User) {
const update: Obj = {
count: 1
users: [user.id]
};
await this.incIfUnique({
await this.inc({
[Users.isLocalUser(user) ? 'local' : 'remote']: update
}, 'users', user.id);
});
}
}

View file

@ -27,6 +27,28 @@ export default class DriveChart extends Chart<DriveLog> {
};
}
@autobind
protected aggregate(logs: DriveLog[]): DriveLog {
return {
local: {
totalCount: logs[0].local.totalCount,
totalSize: logs[0].local.totalSize,
incCount: logs.reduce((a, b) => a + b.local.incCount, 0),
incSize: logs.reduce((a, b) => a + b.local.incSize, 0),
decCount: logs.reduce((a, b) => a + b.local.decCount, 0),
decSize: logs.reduce((a, b) => a + b.local.decSize, 0),
},
remote: {
totalCount: logs[0].remote.totalCount,
totalSize: logs[0].remote.totalSize,
incCount: logs.reduce((a, b) => a + b.remote.incCount, 0),
incSize: logs.reduce((a, b) => a + b.remote.incSize, 0),
decCount: logs.reduce((a, b) => a + b.remote.decCount, 0),
decSize: logs.reduce((a, b) => a + b.remote.decSize, 0),
},
};
}
@autobind
protected async fetchActual(): Promise<DeepPartial<DriveLog>> {
const [localCount, remoteCount, localSize, remoteSize] = await Promise.all([

View file

@ -20,6 +20,17 @@ export default class FederationChart extends Chart<FederationLog> {
};
}
@autobind
protected aggregate(logs: FederationLog[]): FederationLog {
return {
instance: {
total: logs[0].instance.total,
inc: logs.reduce((a, b) => a + b.instance.inc, 0),
dec: logs.reduce((a, b) => a + b.instance.dec, 0),
},
};
}
@autobind
protected async fetchActual(): Promise<DeepPartial<FederationLog>> {
const [total] = await Promise.all([

View file

@ -17,6 +17,18 @@ export default class HashtagChart extends Chart<HashtagLog> {
return {};
}
@autobind
protected aggregate(logs: HashtagLog[]): HashtagLog {
return {
local: {
users: logs.reduce((a, b) => a.concat(b.local.users), [] as HashtagLog['local']['users']),
},
remote: {
users: logs.reduce((a, b) => a.concat(b.remote.users), [] as HashtagLog['remote']['users']),
},
};
}
@autobind
protected async fetchActual(): Promise<DeepPartial<HashtagLog>> {
return {};
@ -25,11 +37,11 @@ export default class HashtagChart extends Chart<HashtagLog> {
@autobind
public async update(hashtag: string, user: User) {
const update: Obj = {
count: 1
users: [user.id]
};
await this.incIfUnique({
await this.inc({
[Users.isLocalUser(user) ? 'local' : 'remote']: update
}, 'users', user.id, hashtag);
}, hashtag);
}
}

View file

@ -36,6 +36,50 @@ export default class InstanceChart extends Chart<InstanceLog> {
};
}
@autobind
protected aggregate(logs: InstanceLog[]): InstanceLog {
return {
requests: {
failed: logs.reduce((a, b) => a + b.requests.failed, 0),
succeeded: logs.reduce((a, b) => a + b.requests.succeeded, 0),
received: logs.reduce((a, b) => a + b.requests.received, 0),
},
notes: {
total: logs[0].notes.total,
inc: logs.reduce((a, b) => a + b.notes.inc, 0),
dec: logs.reduce((a, b) => a + b.notes.dec, 0),
diffs: {
reply: logs.reduce((a, b) => a + b.notes.diffs.reply, 0),
renote: logs.reduce((a, b) => a + b.notes.diffs.renote, 0),
normal: logs.reduce((a, b) => a + b.notes.diffs.normal, 0),
},
},
users: {
total: logs[0].users.total,
inc: logs.reduce((a, b) => a + b.users.inc, 0),
dec: logs.reduce((a, b) => a + b.users.dec, 0),
},
following: {
total: logs[0].following.total,
inc: logs.reduce((a, b) => a + b.following.inc, 0),
dec: logs.reduce((a, b) => a + b.following.dec, 0),
},
followers: {
total: logs[0].followers.total,
inc: logs.reduce((a, b) => a + b.followers.inc, 0),
dec: logs.reduce((a, b) => a + b.followers.dec, 0),
},
drive: {
totalFiles: logs[0].drive.totalFiles,
totalUsage: logs[0].drive.totalUsage,
incFiles: logs.reduce((a, b) => a + b.drive.incFiles, 0),
incUsage: logs.reduce((a, b) => a + b.drive.incUsage, 0),
decFiles: logs.reduce((a, b) => a + b.drive.decFiles, 0),
decUsage: logs.reduce((a, b) => a + b.drive.decUsage, 0),
},
};
}
@autobind
protected async fetchActual(group: string): Promise<DeepPartial<InstanceLog>> {
const [

View file

@ -15,6 +15,17 @@ export default class NetworkChart extends Chart<NetworkLog> {
return {};
}
@autobind
protected aggregate(logs: NetworkLog[]): NetworkLog {
return {
incomingRequests: logs.reduce((a, b) => a + b.incomingRequests, 0),
outgoingRequests: logs.reduce((a, b) => a + b.outgoingRequests, 0),
totalTime: logs.reduce((a, b) => a + b.totalTime, 0),
incomingBytes: logs.reduce((a, b) => a + b.incomingBytes, 0),
outgoingBytes: logs.reduce((a, b) => a + b.outgoingBytes, 0),
};
}
@autobind
protected async fetchActual(): Promise<DeepPartial<NetworkLog>> {
return {};

View file

@ -25,6 +25,32 @@ export default class NotesChart extends Chart<NotesLog> {
};
}
@autobind
protected aggregate(logs: NotesLog[]): NotesLog {
return {
local: {
total: logs[0].local.total,
inc: logs.reduce((a, b) => a + b.local.inc, 0),
dec: logs.reduce((a, b) => a + b.local.dec, 0),
diffs: {
reply: logs.reduce((a, b) => a + b.local.diffs.reply, 0),
renote: logs.reduce((a, b) => a + b.local.diffs.renote, 0),
normal: logs.reduce((a, b) => a + b.local.diffs.normal, 0),
},
},
remote: {
total: logs[0].remote.total,
inc: logs.reduce((a, b) => a + b.remote.inc, 0),
dec: logs.reduce((a, b) => a + b.remote.dec, 0),
diffs: {
reply: logs.reduce((a, b) => a + b.remote.diffs.reply, 0),
renote: logs.reduce((a, b) => a + b.remote.diffs.renote, 0),
normal: logs.reduce((a, b) => a + b.remote.diffs.normal, 0),
},
},
};
}
@autobind
protected async fetchActual(): Promise<DeepPartial<NotesLog>> {
const [localCount, remoteCount] = await Promise.all([

View file

@ -20,6 +20,18 @@ export default class PerUserDriveChart extends Chart<PerUserDriveLog> {
};
}
@autobind
protected aggregate(logs: PerUserDriveLog[]): PerUserDriveLog {
return {
totalCount: logs[0].totalCount,
totalSize: logs[0].totalSize,
incCount: logs.reduce((a, b) => a + b.incCount, 0),
incSize: logs.reduce((a, b) => a + b.incSize, 0),
decCount: logs.reduce((a, b) => a + b.decCount, 0),
decSize: logs.reduce((a, b) => a + b.decSize, 0),
};
}
@autobind
protected async fetchActual(group: string): Promise<DeepPartial<PerUserDriveLog>> {
const [count, size] = await Promise.all([

View file

@ -35,6 +35,36 @@ export default class PerUserFollowingChart extends Chart<PerUserFollowingLog> {
};
}
@autobind
protected aggregate(logs: PerUserFollowingLog[]): PerUserFollowingLog {
return {
local: {
followings: {
total: logs[0].local.followings.total,
inc: logs.reduce((a, b) => a + b.local.followings.inc, 0),
dec: logs.reduce((a, b) => a + b.local.followings.dec, 0),
},
followers: {
total: logs[0].local.followers.total,
inc: logs.reduce((a, b) => a + b.local.followers.inc, 0),
dec: logs.reduce((a, b) => a + b.local.followers.dec, 0),
},
},
remote: {
followings: {
total: logs[0].remote.followings.total,
inc: logs.reduce((a, b) => a + b.remote.followings.inc, 0),
dec: logs.reduce((a, b) => a + b.remote.followings.dec, 0),
},
followers: {
total: logs[0].remote.followers.total,
inc: logs.reduce((a, b) => a + b.remote.followers.inc, 0),
dec: logs.reduce((a, b) => a + b.remote.followers.dec, 0),
},
},
};
}
@autobind
protected async fetchActual(group: string): Promise<DeepPartial<PerUserFollowingLog>> {
const [

View file

@ -20,6 +20,20 @@ export default class PerUserNotesChart extends Chart<PerUserNotesLog> {
};
}
@autobind
protected aggregate(logs: PerUserNotesLog[]): PerUserNotesLog {
return {
total: logs[0].total,
inc: logs.reduce((a, b) => a + b.inc, 0),
dec: logs.reduce((a, b) => a + b.dec, 0),
diffs: {
reply: logs.reduce((a, b) => a + b.diffs.reply, 0),
renote: logs.reduce((a, b) => a + b.diffs.renote, 0),
normal: logs.reduce((a, b) => a + b.diffs.normal, 0),
},
};
}
@autobind
protected async fetchActual(group: string): Promise<DeepPartial<PerUserNotesLog>> {
const [count] = await Promise.all([

View file

@ -18,6 +18,18 @@ export default class PerUserReactionsChart extends Chart<PerUserReactionsLog> {
return {};
}
@autobind
protected aggregate(logs: PerUserReactionsLog[]): PerUserReactionsLog {
return {
local: {
count: logs.reduce((a, b) => a + b.local.count, 0),
},
remote: {
count: logs.reduce((a, b) => a + b.remote.count, 0),
},
};
}
@autobind
protected async fetchActual(group: string): Promise<DeepPartial<PerUserReactionsLog>> {
return {};

View file

@ -21,6 +21,17 @@ export default class TestGroupedChart extends Chart<TestGroupedLog> {
};
}
@autobind
protected aggregate(logs: TestGroupedLog[]): TestGroupedLog {
return {
foo: {
total: logs[0].foo.total,
inc: logs.reduce((a, b) => a + b.foo.inc, 0),
dec: logs.reduce((a, b) => a + b.foo.dec, 0),
},
};
}
@autobind
protected async fetchActual(group: string): Promise<DeepPartial<TestGroupedLog>> {
return {

View file

@ -15,6 +15,13 @@ export default class TestUniqueChart extends Chart<TestUniqueLog> {
return {};
}
@autobind
protected aggregate(logs: TestUniqueLog[]): TestUniqueLog {
return {
foo: logs.reduce((a, b) => a.concat(b.foo), [] as TestUniqueLog['foo']),
};
}
@autobind
protected async fetchActual(): Promise<DeepPartial<TestUniqueLog>> {
return {};
@ -22,8 +29,8 @@ export default class TestUniqueChart extends Chart<TestUniqueLog> {
@autobind
public async uniqueIncrement(key: string) {
await this.incIfUnique({
foo: 1
}, 'foos', key);
await this.inc({
foo: [key]
});
}
}

View file

@ -21,6 +21,17 @@ export default class TestChart extends Chart<TestLog> {
};
}
@autobind
protected aggregate(logs: TestLog[]): TestLog {
return {
foo: {
total: logs[0].foo.total,
inc: logs.reduce((a, b) => a + b.foo.inc, 0),
dec: logs.reduce((a, b) => a + b.foo.dec, 0),
},
};
}
@autobind
protected async fetchActual(): Promise<DeepPartial<TestLog>> {
return {

View file

@ -25,6 +25,22 @@ export default class UsersChart extends Chart<UsersLog> {
};
}
@autobind
protected aggregate(logs: UsersLog[]): UsersLog {
return {
local: {
total: logs[0].local.total,
inc: logs.reduce((a, b) => a + b.local.inc, 0),
dec: logs.reduce((a, b) => a + b.local.dec, 0),
},
remote: {
total: logs[0].remote.total,
inc: logs.reduce((a, b) => a + b.remote.inc, 0),
dec: logs.reduce((a, b) => a + b.remote.dec, 0),
},
};
}
@autobind
protected async fetchActual(): Promise<DeepPartial<UsersLog>> {
const [localCount, remoteCount] = await Promise.all([

View file

@ -1,11 +1,15 @@
export const logSchema = {
/**
*
*
*/
count: {
type: 'number' as const,
users: {
type: 'array' as const,
optional: false as const, nullable: false as const,
description: 'アクティブユーザー数',
description: 'アクティブユーザー',
items: {
type: 'string' as const,
optional: false as const, nullable: false as const,
}
},
};

View file

@ -1,11 +1,15 @@
export const logSchema = {
/**
* 稿
* 稿
*/
count: {
type: 'number' as const,
users: {
type: 'array' as const,
optional: false as const, nullable: false as const,
description: '投稿された数',
description: '投稿したユーザー',
items: {
type: 'string' as const,
optional: false as const, nullable: false as const,
}
},
};

View file

@ -3,9 +3,12 @@ export const schema = {
optional: false as const, nullable: false as const,
properties: {
foo: {
type: 'number' as const,
type: 'array' as const,
optional: false as const, nullable: false as const,
description: ''
items: {
type: 'string' as const,
optional: false as const, nullable: false as const,
}
},
}
};

View file

@ -24,8 +24,6 @@ type ArrayValue<T> = {
[P in keyof T]: T[P] extends number ? T[P][] : ArrayValue<T[P]>;
};
type Span = 'day' | 'hour';
type Log = {
id: number;
@ -38,22 +36,14 @@ type Log = {
* Unixタイムスタンプ()
*/
date: number;
/**
*
*/
span: Span;
/**
*
*/
unique?: Record<string, any>;
};
const camelToSnake = (str: string) => {
return str.replace(/([A-Z])/g, s => '_' + s.charAt(0).toLowerCase());
};
const removeDuplicates = (array: any[]) => Array.from(new Set(array));
/**
*
*/
@ -62,10 +52,21 @@ export default abstract class Chart<T extends Record<string, any>> {
private static readonly columnDot = '_';
private name: string;
private queue: {
diff: DeepPartial<T>;
group: string | null;
}[] = [];
public schema: Schema;
protected repository: Repository<Log>;
protected abstract genNewLog(latest: T): DeepPartial<T>;
protected abstract async fetchActual(group: string | null): Promise<DeepPartial<T>>;
/**
* @param logs
*/
protected abstract aggregate(logs: T[]): T;
protected abstract fetchActual(group: string | null): Promise<DeepPartial<T>>;
@autobind
private static convertSchemaToFlatColumnDefinitions(schema: Schema) {
@ -75,10 +76,15 @@ export default abstract class Chart<T extends Record<string, any>> {
const p = path ? `${path}${this.columnDot}${k}` : k;
if (v.type === 'object') {
flatColumns(v.properties, p);
} else {
} else if (v.type === 'number') {
columns[this.columnPrefix + p] = {
type: 'bigint',
};
} else if (v.type === 'array' && v.items.type === 'string') {
columns[this.columnPrefix + p] = {
type: 'varchar',
array: true,
};
}
}
};
@ -99,11 +105,11 @@ export default abstract class Chart<T extends Record<string, any>> {
@autobind
private static convertObjectToFlattenColumns(x: Record<string, any>) {
const columns = {} as Record<string, number>;
const columns = {} as Record<string, number | unknown[]>;
const flatten = (x: Obj, path?: string) => {
for (const [k, v] of Object.entries(x)) {
const p = path ? `${path}${this.columnDot}${k}` : k;
if (typeof v === 'object') {
if (typeof v === 'object' && !Array.isArray(v)) {
flatten(v, p);
} else {
columns[this.columnPrefix + p] = v;
@ -115,14 +121,37 @@ export default abstract class Chart<T extends Record<string, any>> {
}
@autobind
private static convertQuery(x: Record<string, any>) {
private static countUniqueFields(x: Record<string, any>) {
const exec = (x: Obj) => {
const res = {} as Record<string, any>;
for (const [k, v] of Object.entries(x)) {
if (typeof v === 'object' && !Array.isArray(v)) {
res[k] = exec(v);
} else if (Array.isArray(v)) {
res[k] = Array.from(new Set(v)).length;
} else {
res[k] = v;
}
}
return res;
};
return exec(x);
}
@autobind
private static convertQuery(diff: Record<string, number | unknown[]>) {
const query: Record<string, Function> = {};
const columns = Chart.convertObjectToFlattenColumns(x);
for (const [k, v] of Object.entries(columns)) {
if (v > 0) query[k] = () => `"${k}" + ${v}`;
if (v < 0) query[k] = () => `"${k}" - ${Math.abs(v)}`;
for (const [k, v] of Object.entries(diff)) {
if (typeof v === 'number') {
if (v > 0) query[k] = () => `"${k}" + ${v}`;
if (v < 0) query[k] = () => `"${k}" - ${Math.abs(v)}`;
} else if (Array.isArray(v)) {
// TODO: item が文字列以外の場合も対応
// TODO: item をSQLエスケープ
const items = v.map(item => `"${item}"`).join(',');
query[k] = () => `array_cat("${k}", '{${items}}'::varchar[])`;
}
}
return query;
@ -169,28 +198,14 @@ export default abstract class Chart<T extends Record<string, any>> {
length: 128,
nullable: true
},
span: {
type: 'enum',
enum: ['hour', 'day']
},
unique: {
type: 'jsonb',
default: {}
},
...Chart.convertSchemaToFlatColumnDefinitions(schema)
},
indices: [{
columns: ['date']
}, {
columns: ['span']
}, {
columns: ['group']
}, {
columns: ['span', 'date']
}, {
columns: ['date', 'group']
}, {
columns: ['span', 'date', 'group']
}]
});
}
@ -200,7 +215,7 @@ export default abstract class Chart<T extends Record<string, any>> {
this.schema = schema;
const entity = Chart.schemaToEntity(name, schema);
const keys = ['span', 'date'];
const keys = ['date'];
if (grouped) keys.push('group');
entity.options.uniques = [{
@ -220,7 +235,8 @@ export default abstract class Chart<T extends Record<string, any>> {
flatColumns(v.properties, p);
} else {
if (nestedProperty.get(log, p) == null) {
nestedProperty.set(log, p, 0);
const emptyValue = v.type === 'number' ? 0 : [];
nestedProperty.set(log, p, emptyValue);
}
}
}
@ -230,10 +246,9 @@ export default abstract class Chart<T extends Record<string, any>> {
}
@autobind
private getLatestLog(span: Span, group: string | null = null): Promise<Log | null> {
private getLatestLog(group: string | null = null): Promise<Log | null> {
return this.repository.findOne({
group: group,
span: span
}, {
order: {
date: -1
@ -242,17 +257,13 @@ export default abstract class Chart<T extends Record<string, any>> {
}
@autobind
private async getCurrentLog(span: Span, group: string | null = null): Promise<Log> {
private async getCurrentLog(group: string | null = null): Promise<Log> {
const [y, m, d, h] = Chart.getCurrentDate();
const current =
span == 'day' ? dateUTC([y, m, d, 0]) :
span == 'hour' ? dateUTC([y, m, d, h]) :
null as never;
const current = dateUTC([y, m, d, h]);
// 現在(今日または今のHour)のログ
// 現在(=今のHour)のログ
const currentLog = await this.repository.findOne({
span: span,
date: Chart.dateToTimestamp(current),
...(group ? { group: group } : {})
});
@ -271,7 +282,7 @@ export default abstract class Chart<T extends Record<string, any>> {
// * 昨日何もチャートを更新するような出来事がなかった場合は、
// * ログがそもそも作られずドキュメントが存在しないということがあり得るため、
// * 「昨日の」と決め打ちせずに「もっとも最近の」とします
const latest = await this.getLatestLog(span, group);
const latest = await this.getLatestLog(group);
if (latest != null) {
const obj = Chart.convertFlattenColumnsToObject(
@ -286,17 +297,16 @@ export default abstract class Chart<T extends Record<string, any>> {
// 初期ログデータを作成
data = this.getNewLog(null);
logger.info(`${this.name + (group ? `:${group}` : '')} (${span}): Initial commit created`);
logger.info(`${this.name + (group ? `:${group}` : '')}: Initial commit created`);
}
const date = Chart.dateToTimestamp(current);
const lockKey = `${this.name}:${date}:${group}:${span}`;
const lockKey = `${this.name}:${date}:${group}`;
const unlock = await getChartInsertLock(lockKey);
try {
// ロック内でもう1回チェックする
const currentLog = await this.repository.findOne({
span: span,
date: date,
...(group ? { group: group } : {})
});
@ -307,12 +317,11 @@ export default abstract class Chart<T extends Record<string, any>> {
// 新規ログ挿入
log = await this.repository.save({
group: group,
span: span,
date: date,
...Chart.convertObjectToFlattenColumns(data)
});
logger.info(`${this.name + (group ? `:${group}` : '')} (${span}): New commit created`);
logger.info(`${this.name + (group ? `:${group}` : '')}: New commit created`);
return log;
} finally {
@ -321,38 +330,62 @@ export default abstract class Chart<T extends Record<string, any>> {
}
@autobind
protected commit(query: Record<string, Function>, group: string | null = null, uniqueKey?: string, uniqueValue?: string): Promise<any> {
const update = async (log: Log) => {
// ユニークインクリメントの場合、指定のキーに指定の値が既に存在していたら弾く
if (
uniqueKey && log.unique &&
log.unique[uniqueKey] &&
log.unique[uniqueKey].includes(uniqueValue)
) return;
protected commit(diff: DeepPartial<T>, group: string | null = null): void {
this.queue.push({
diff, group,
});
}
// ユニークインクリメントの指定のキーに値を追加
if (uniqueKey && log.unique) {
if (log.unique[uniqueKey]) {
const sql = `jsonb_set("unique", '{${uniqueKey}}', ("unique"->>'${uniqueKey}')::jsonb || '["${uniqueValue}"]'::jsonb)`;
query['unique'] = () => sql;
} else {
const sql = `jsonb_set("unique", '{${uniqueKey}}', '["${uniqueValue}"]')`;
query['unique'] = () => sql;
@autobind
public async save() {
if (this.queue.length === 0) {
logger.info(`${this.name}: Write skipped`);
return;
}
// TODO: 前の時間のログがqueueにあった場合のハンドリング
// 例えば、save が20分ごとに行われるとして、前回行われたのは 01:50 だったとする。
// 次に save が行われるのは 02:10 ということになるが、もし 01:55 に新規ログが queue に追加されたとすると、
// そのログは本来は 01:00~ のログとしてDBに保存されて欲しいのに、02:00~ のログ扱いになってしまう。
// これを回避するための実装は複雑になりそうなため、一旦保留。
const update = async (log: Log) => {
const finalDiffs = {} as Record<string, number | unknown[]>;
for (const diff of this.queue.filter(q => q.group === log.group).map(q => q.diff)) {
const columns = Chart.convertObjectToFlattenColumns(diff);
for (const [k, v] of Object.entries(columns)) {
if (finalDiffs[k] == null) {
finalDiffs[k] = v;
} else {
if (typeof finalDiffs[k] === 'number') {
(finalDiffs[k] as number) += v as number;
} else {
(finalDiffs[k] as unknown[]) = (finalDiffs[k] as unknown[]).concat(v);
}
}
}
}
const query = Chart.convertQuery(finalDiffs);
// ログ更新
await this.repository.createQueryBuilder()
.update()
.set(query)
.where('id = :id', { id: log.id })
.execute();
logger.info(`${this.name + (log.group ? `:${log.group}` : '')}: Updated`);
// TODO: この一連の処理が始まった後に新たにqueueに入ったものは消さないようにする
this.queue = this.queue.filter(q => q.group !== log.group);
};
return Promise.all([
this.getCurrentLog('day', group).then(log => update(log)),
this.getCurrentLog('hour', group).then(log => update(log)),
]);
const groups = removeDuplicates(this.queue.map(log => log.group));
await Promise.all(groups.map(group => this.getCurrentLog(group).then(log => update(log))));
}
@autobind
@ -367,39 +400,30 @@ export default abstract class Chart<T extends Record<string, any>> {
.execute();
};
return Promise.all([
this.getCurrentLog('day', group).then(log => update(log)),
this.getCurrentLog('hour', group).then(log => update(log)),
]);
return this.getCurrentLog(group).then(log => update(log));
}
@autobind
protected async inc(inc: DeepPartial<T>, group: string | null = null): Promise<void> {
await this.commit(Chart.convertQuery(inc as any), group);
await this.commit(inc, group);
}
@autobind
protected async incIfUnique(inc: DeepPartial<T>, key: string, value: string, group: string | null = null): Promise<void> {
await this.commit(Chart.convertQuery(inc as any), group, key, value);
}
@autobind
public async getChart(span: Span, amount: number, begin: Date | null, group: string | null = null): Promise<ArrayValue<T>> {
const [y, m, d, h, _m, _s, _ms] = begin ? Chart.parseDate(subtractTime(addTime(begin, 1, span), 1)) : Chart.getCurrentDate();
const [y2, m2, d2, h2] = begin ? Chart.parseDate(addTime(begin, 1, span)) : [] as never;
public async getChart(span: 'hour' | 'day', amount: number, cursor: Date | null, group: string | null = null): Promise<ArrayValue<T>> {
const [y, m, d, h, _m, _s, _ms] = cursor ? Chart.parseDate(subtractTime(addTime(cursor, 1, span), 1)) : Chart.getCurrentDate();
const [y2, m2, d2, h2] = cursor ? Chart.parseDate(addTime(cursor, 1, span)) : [] as never;
const lt = dateUTC([y, m, d, h, _m, _s, _ms]);
const gt =
span === 'day' ? subtractTime(begin ? dateUTC([y2, m2, d2, 0]) : dateUTC([y, m, d, 0]), amount - 1, 'day') :
span === 'hour' ? subtractTime(begin ? dateUTC([y2, m2, d2, h2]) : dateUTC([y, m, d, h]), amount - 1, 'hour') :
span === 'day' ? subtractTime(cursor ? dateUTC([y2, m2, d2, 0]) : dateUTC([y, m, d, 0]), amount - 1, 'day') :
span === 'hour' ? subtractTime(cursor ? dateUTC([y2, m2, d2, h2]) : dateUTC([y, m, d, h]), amount - 1, 'hour') :
null as never;
// ログ取得
let logs = await this.repository.find({
where: {
group: group,
span: span,
date: Between(Chart.dateToTimestamp(gt), Chart.dateToTimestamp(lt))
},
order: {
@ -413,7 +437,6 @@ export default abstract class Chart<T extends Record<string, any>> {
// (すくなくともひとつログが無いと隙間埋めできないため)
const recentLog = await this.repository.findOne({
group: group,
span: span
}, {
order: {
date: -1
@ -430,7 +453,6 @@ export default abstract class Chart<T extends Record<string, any>> {
// (隙間埋めできないため)
const outdatedLog = await this.repository.findOne({
group: group,
span: span,
date: LessThan(Chart.dateToTimestamp(gt))
}, {
order: {
@ -445,23 +467,56 @@ export default abstract class Chart<T extends Record<string, any>> {
const chart: T[] = [];
// 整形
for (let i = (amount - 1); i >= 0; i--) {
const current =
span === 'day' ? subtractTime(dateUTC([y, m, d, 0]), i, 'day') :
span === 'hour' ? subtractTime(dateUTC([y, m, d, h]), i, 'hour') :
null as never;
if (span === 'hour') {
for (let i = (amount - 1); i >= 0; i--) {
const current = subtractTime(dateUTC([y, m, d, h]), i, 'hour');
const log = logs.find(l => isTimeSame(new Date(l.date * 1000), current));
const log = logs.find(l => isTimeSame(new Date(l.date * 1000), current));
if (log) {
const data = Chart.convertFlattenColumnsToObject(log as Record<string, any>);
chart.unshift(data);
} else {
// 隙間埋め
const latest = logs.find(l => isTimeBefore(new Date(l.date * 1000), current));
const data = latest ? Chart.convertFlattenColumnsToObject(latest as Record<string, any>) : null;
chart.unshift(this.getNewLog(data));
if (log) {
const data = Chart.convertFlattenColumnsToObject(log as Record<string, any>);
chart.unshift(Chart.countUniqueFields(data));
} else {
// 隙間埋め
const latest = logs.find(l => isTimeBefore(new Date(l.date * 1000), current));
const data = latest ? Chart.convertFlattenColumnsToObject(latest as Record<string, any>) : null;
chart.unshift(Chart.countUniqueFields(this.getNewLog(data)));
}
}
} else if (span === 'day') {
const logsForEachDays: T[][] = [];
let currentDay = -1;
let currentDayIndex = -1;
for (let i = ((amount - 1) * 24) + h; i >= 0; i--) {
const current = subtractTime(dateUTC([y, m, d, h]), i, 'hour');
const _currentDay = Chart.parseDate(current)[2];
if (currentDay != _currentDay) currentDayIndex++;
currentDay = _currentDay;
const log = logs.find(l => isTimeSame(new Date(l.date * 1000), current));
if (log) {
if (logsForEachDays[currentDayIndex]) {
logsForEachDays[currentDayIndex].unshift(Chart.convertFlattenColumnsToObject(log));
} else {
logsForEachDays[currentDayIndex] = [Chart.convertFlattenColumnsToObject(log)];
}
} else {
// 隙間埋め
const latest = logs.find(l => isTimeBefore(new Date(l.date * 1000), current));
const data = latest ? Chart.convertFlattenColumnsToObject(latest as Record<string, any>) : null;
const newLog = this.getNewLog(data);
if (logsForEachDays[currentDayIndex]) {
logsForEachDays[currentDayIndex].unshift(newLog);
} else {
logsForEachDays[currentDayIndex] = [newLog];
}
}
}
for (const logs of logsForEachDays) {
const log = this.aggregate(logs);
chart.unshift(Chart.countUniqueFields(log));
}
}
@ -473,20 +528,19 @@ export default abstract class Chart<T extends Record<string, any>> {
* { foo: [1, 2, 3], bar: [5, 6, 7] }
*
*/
const dive = (x: Obj, path?: string) => {
const compact = (x: Obj, path?: string) => {
for (const [k, v] of Object.entries(x)) {
const p = path ? `${path}.${k}` : k;
if (typeof v == 'object') {
dive(v, p);
if (typeof v === 'object' && !Array.isArray(v)) {
compact(v, p);
} else {
const values = chart.map(s => nestedProperty.get(s, p))
.map(v => parseInt(v, 10)); // TypeORMのバグ()で何故か数値カラムの値が文字列型になっているので数値に戻す
const values = chart.map(s => nestedProperty.get(s, p));
nestedProperty.set(res, p, values);
}
}
};
dive(chart[0]);
compact(chart[0]);
return res;
}

View file

@ -10,6 +10,7 @@ import PerUserReactionsChart from './charts/classes/per-user-reactions';
import HashtagChart from './charts/classes/hashtag';
import PerUserFollowingChart from './charts/classes/per-user-following';
import PerUserDriveChart from './charts/classes/per-user-drive';
import { beforeShutdown } from '../../misc/before-shutdown';
export const federationChart = new FederationChart();
export const notesChart = new NotesChart();
@ -23,3 +24,27 @@ export const perUserReactionsChart = new PerUserReactionsChart();
export const hashtagChart = new HashtagChart();
export const perUserFollowingChart = new PerUserFollowingChart();
export const perUserDriveChart = new PerUserDriveChart();
const charts = [
federationChart,
notesChart,
usersChart,
networkChart,
activeUsersChart,
instanceChart,
perUserNotesChart,
driveChart,
perUserReactionsChart,
hashtagChart,
perUserFollowingChart,
perUserDriveChart,
];
// 20分おきにメモリ情報をDBに書き込み
setInterval(() => {
for (const chart of charts) {
chart.save();
}
}, 1000 * 60 * 20);
beforeShutdown(() => Promise.all(charts.map(chart => chart.save())));