egirlskey/packages/backend/src/core/MetaService.ts

128 lines
3.2 KiB
TypeScript
Raw Normal View History

2022-09-17 18:27:08 +00:00
import { Inject, Injectable } from '@nestjs/common';
import { DataSource } from 'typeorm';
import Redis from 'ioredis';
2022-09-17 18:27:08 +00:00
import { DI } from '@/di-symbols.js';
import { Meta } from '@/models/entities/Meta.js';
import { GlobalEventService } from '@/core/GlobalEventService.js';
import { bindThis } from '@/decorators.js';
import { StreamMessages } from '@/server/api/stream/types.js';
import type { OnApplicationShutdown } from '@nestjs/common';
2022-09-17 18:27:08 +00:00
@Injectable()
export class MetaService implements OnApplicationShutdown {
2022-09-18 18:11:50 +00:00
private cache: Meta | undefined;
private intervalId: NodeJS.Timer;
2022-09-17 18:27:08 +00:00
constructor(
@Inject(DI.redisForPubsub)
private redisForPubsub: Redis.Redis,
2022-09-17 18:27:08 +00:00
@Inject(DI.db)
private db: DataSource,
private globalEventService: GlobalEventService,
2022-09-17 18:27:08 +00:00
) {
//this.onMessage = this.onMessage.bind(this);
2022-09-17 18:27:08 +00:00
if (process.env.NODE_ENV !== 'test') {
2022-09-18 18:11:50 +00:00
this.intervalId = setInterval(() => {
2022-09-17 18:27:08 +00:00
this.fetch(true).then(meta => {
// fetch内でもセットしてるけど仕様変更の可能性もあるため一応
2022-09-18 18:11:50 +00:00
this.cache = meta;
2022-09-17 18:27:08 +00:00
});
}, 1000 * 60 * 5);
2022-09-17 18:27:08 +00:00
}
this.redisForPubsub.on('message', this.onMessage);
2022-09-17 18:27:08 +00:00
}
@bindThis
2022-09-23 22:12:11 +00:00
private async onMessage(_: string, data: string): Promise<void> {
const obj = JSON.parse(data);
if (obj.channel === 'internal') {
const { type, body } = obj.message as StreamMessages['internal']['payload'];
switch (type) {
case 'metaUpdated': {
this.cache = body;
break;
}
default:
break;
}
}
}
@bindThis
public async fetch(noCache = false): Promise<Meta> {
2022-09-18 18:11:50 +00:00
if (!noCache && this.cache) return this.cache;
2022-09-17 18:27:08 +00:00
return await this.db.transaction(async transactionalEntityManager => {
// 過去のバグでレコードが複数出来てしまっている可能性があるので新しいIDを優先する
const metas = await transactionalEntityManager.find(Meta, {
order: {
id: 'DESC',
},
});
const meta = metas[0];
if (meta) {
2022-09-18 18:11:50 +00:00
this.cache = meta;
2022-09-17 18:27:08 +00:00
return meta;
} else {
// metaが空のときfetchMetaが同時に呼ばれるとここが同時に呼ばれてしまうことがあるのでフェイルセーフなupsertを使う
const saved = await transactionalEntityManager
.upsert(
Meta,
{
id: 'x',
},
['id'],
)
.then((x) => transactionalEntityManager.findOneByOrFail(Meta, x.identifiers[0]));
2022-09-18 18:11:50 +00:00
this.cache = saved;
2022-09-17 18:27:08 +00:00
return saved;
}
});
}
@bindThis
public async update(data: Partial<Meta>): Promise<Meta> {
const updated = await this.db.transaction(async transactionalEntityManager => {
const metas = await transactionalEntityManager.find(Meta, {
order: {
id: 'DESC',
},
});
const meta = metas[0];
if (meta) {
await transactionalEntityManager.update(Meta, meta.id, data);
const metas = await transactionalEntityManager.find(Meta, {
order: {
id: 'DESC',
},
});
return metas[0];
} else {
return await transactionalEntityManager.save(Meta, data);
}
});
this.globalEventService.publishInternalEvent('metaUpdated', updated);
return updated;
}
@bindThis
2022-09-17 18:27:08 +00:00
public onApplicationShutdown(signal?: string | undefined) {
2022-09-18 18:11:50 +00:00
clearInterval(this.intervalId);
this.redisForPubsub.off('message', this.onMessage);
2022-09-17 18:27:08 +00:00
}
}