重いのでジョブキュー無効化
This commit is contained in:
		
							parent
							
								
									7ca073aafd
								
							
						
					
					
						commit
						310371658b
					
				
					 6 changed files with 48 additions and 36 deletions
				
			
		| 
						 | 
				
			
			@ -5,9 +5,9 @@ program
 | 
			
		|||
	.version(pkg.version)
 | 
			
		||||
	.option('--no-daemons', 'Disable daemon processes (for debbuging)')
 | 
			
		||||
	.option('--disable-clustering', 'Disable clustering')
 | 
			
		||||
	.option('--disable-ap-queue', 'Disable creating job queue related to ap')
 | 
			
		||||
	.option('--disable-queue', 'Disable job queue processing')
 | 
			
		||||
	.option('--only-queue', 'Pocessing job queue only')
 | 
			
		||||
	.option('--only-server', 'Run server only (without job queue)')
 | 
			
		||||
	.option('--only-queue', 'Pocessing job queue only (without server)')
 | 
			
		||||
	.option('--quiet', 'Suppress all logs')
 | 
			
		||||
	.option('--verbose', 'Enable all logs')
 | 
			
		||||
	.option('--with-log-time', 'Include timestamp for each logs')
 | 
			
		||||
| 
						 | 
				
			
			@ -15,8 +15,7 @@ program
 | 
			
		|||
	.option('--color', 'This option is a dummy for some external program\'s (e.g. forever) issue.')
 | 
			
		||||
	.parse(process.argv);
 | 
			
		||||
 | 
			
		||||
/*if (process.env.MK_DISABLE_AP_QUEUE)*/ program.disableApQueue = true;
 | 
			
		||||
if (process.env.MK_DISABLE_QUEUE) program.disableQueue = true;
 | 
			
		||||
/*if (process.env.MK_DISABLE_QUEUE)*/ program.disableQueue = true;
 | 
			
		||||
if (process.env.MK_ONLY_QUEUE) program.onlyQueue = true;
 | 
			
		||||
 | 
			
		||||
export { program };
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -8,12 +8,13 @@ import handler from './processors';
 | 
			
		|||
import { queueLogger } from './logger';
 | 
			
		||||
 | 
			
		||||
const enableQueue = !program.disableQueue;
 | 
			
		||||
const enableQueueProcessing = !program.onlyServer && enableQueue;
 | 
			
		||||
const queueAvailable = config.redis != null;
 | 
			
		||||
 | 
			
		||||
const queue = initializeQueue();
 | 
			
		||||
 | 
			
		||||
function initializeQueue() {
 | 
			
		||||
	if (queueAvailable) {
 | 
			
		||||
	if (queueAvailable && enableQueue) {
 | 
			
		||||
		return new Queue('misskey', {
 | 
			
		||||
			redis: {
 | 
			
		||||
				port: config.redis.port,
 | 
			
		||||
| 
						 | 
				
			
			@ -42,7 +43,7 @@ export function deliver(user: ILocalUser, content: any, to: any) {
 | 
			
		|||
		to
 | 
			
		||||
	};
 | 
			
		||||
 | 
			
		||||
	if (queueAvailable && !program.disableApQueue) {
 | 
			
		||||
	if (queueAvailable && enableQueueProcessing) {
 | 
			
		||||
		return queue.createJob(data)
 | 
			
		||||
			.retries(8)
 | 
			
		||||
			.backoff('exponential', 1000)
 | 
			
		||||
| 
						 | 
				
			
			@ -59,7 +60,7 @@ export function processInbox(activity: any, signature: httpSignature.IParsedSign
 | 
			
		|||
		signature
 | 
			
		||||
	};
 | 
			
		||||
 | 
			
		||||
	if (queueAvailable && !program.disableApQueue) {
 | 
			
		||||
	if (queueAvailable && enableQueueProcessing) {
 | 
			
		||||
		return queue.createJob(data)
 | 
			
		||||
			.retries(3)
 | 
			
		||||
			.backoff('exponential', 500)
 | 
			
		||||
| 
						 | 
				
			
			@ -70,47 +71,59 @@ export function processInbox(activity: any, signature: httpSignature.IParsedSign
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
export function createExportNotesJob(user: ILocalUser) {
 | 
			
		||||
	if (!queueAvailable) throw 'queue unavailable';
 | 
			
		||||
 | 
			
		||||
	return queue.createJob({
 | 
			
		||||
	const data = {
 | 
			
		||||
		type: 'exportNotes',
 | 
			
		||||
		user: user
 | 
			
		||||
	})
 | 
			
		||||
		.save();
 | 
			
		||||
	};
 | 
			
		||||
 | 
			
		||||
	if (queueAvailable && enableQueueProcessing) {
 | 
			
		||||
		return queue.createJob(data).save();
 | 
			
		||||
	} else {
 | 
			
		||||
		return handler({ data }, () => {});
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export function createExportFollowingJob(user: ILocalUser) {
 | 
			
		||||
	if (!queueAvailable) throw 'queue unavailable';
 | 
			
		||||
 | 
			
		||||
	return queue.createJob({
 | 
			
		||||
	const data = {
 | 
			
		||||
		type: 'exportFollowing',
 | 
			
		||||
		user: user
 | 
			
		||||
	})
 | 
			
		||||
		.save();
 | 
			
		||||
	};
 | 
			
		||||
 | 
			
		||||
	if (queueAvailable && enableQueueProcessing) {
 | 
			
		||||
		return queue.createJob(data).save();
 | 
			
		||||
	} else {
 | 
			
		||||
		return handler({ data }, () => {});
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export function createExportMuteJob(user: ILocalUser) {
 | 
			
		||||
	if (!queueAvailable) throw 'queue unavailable';
 | 
			
		||||
 | 
			
		||||
	return queue.createJob({
 | 
			
		||||
	const data = {
 | 
			
		||||
		type: 'exportMute',
 | 
			
		||||
		user: user
 | 
			
		||||
	})
 | 
			
		||||
		.save();
 | 
			
		||||
	};
 | 
			
		||||
 | 
			
		||||
	if (queueAvailable && enableQueueProcessing) {
 | 
			
		||||
		return queue.createJob(data).save();
 | 
			
		||||
	} else {
 | 
			
		||||
		return handler({ data }, () => {});
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export function createExportBlockingJob(user: ILocalUser) {
 | 
			
		||||
	if (!queueAvailable) throw 'queue unavailable';
 | 
			
		||||
 | 
			
		||||
	return queue.createJob({
 | 
			
		||||
	const data = {
 | 
			
		||||
		type: 'exportBlocking',
 | 
			
		||||
		user: user
 | 
			
		||||
	})
 | 
			
		||||
		.save();
 | 
			
		||||
	};
 | 
			
		||||
 | 
			
		||||
	if (queueAvailable && enableQueueProcessing) {
 | 
			
		||||
		return queue.createJob(data).save();
 | 
			
		||||
	} else {
 | 
			
		||||
		return handler({ data }, () => {});
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export default function() {
 | 
			
		||||
	if (queueAvailable && enableQueue) {
 | 
			
		||||
	if (queueAvailable && enableQueueProcessing) {
 | 
			
		||||
		queue.process(128, handler);
 | 
			
		||||
		queueLogger.succ('Processing started');
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -48,7 +48,7 @@ export async function exportBlocking(job: bq.Job, done: any): Promise<void> {
 | 
			
		|||
 | 
			
		||||
		if (blockings.length === 0) {
 | 
			
		||||
			ended = true;
 | 
			
		||||
			job.reportProgress(100);
 | 
			
		||||
			if (job.reportProgress) job.reportProgress(100);
 | 
			
		||||
			break;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -74,7 +74,7 @@ export async function exportBlocking(job: bq.Job, done: any): Promise<void> {
 | 
			
		|||
			blockerId: user._id,
 | 
			
		||||
		});
 | 
			
		||||
 | 
			
		||||
		job.reportProgress(exportedCount / total);
 | 
			
		||||
		if (job.reportProgress) job.reportProgress(exportedCount / total);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	stream.end();
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -48,7 +48,7 @@ export async function exportFollowing(job: bq.Job, done: any): Promise<void> {
 | 
			
		|||
 | 
			
		||||
		if (followings.length === 0) {
 | 
			
		||||
			ended = true;
 | 
			
		||||
			job.reportProgress(100);
 | 
			
		||||
			if (job.reportProgress) job.reportProgress(100);
 | 
			
		||||
			break;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -74,7 +74,7 @@ export async function exportFollowing(job: bq.Job, done: any): Promise<void> {
 | 
			
		|||
			followerId: user._id,
 | 
			
		||||
		});
 | 
			
		||||
 | 
			
		||||
		job.reportProgress(exportedCount / total);
 | 
			
		||||
		if (job.reportProgress) job.reportProgress(exportedCount / total);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	stream.end();
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -48,7 +48,7 @@ export async function exportMute(job: bq.Job, done: any): Promise<void> {
 | 
			
		|||
 | 
			
		||||
		if (mutes.length === 0) {
 | 
			
		||||
			ended = true;
 | 
			
		||||
			job.reportProgress(100);
 | 
			
		||||
			if (job.reportProgress) job.reportProgress(100);
 | 
			
		||||
			break;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -74,7 +74,7 @@ export async function exportMute(job: bq.Job, done: any): Promise<void> {
 | 
			
		|||
			muterId: user._id,
 | 
			
		||||
		});
 | 
			
		||||
 | 
			
		||||
		job.reportProgress(exportedCount / total);
 | 
			
		||||
		if (job.reportProgress) job.reportProgress(exportedCount / total);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	stream.end();
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -58,7 +58,7 @@ export async function exportNotes(job: bq.Job, done: any): Promise<void> {
 | 
			
		|||
 | 
			
		||||
		if (notes.length === 0) {
 | 
			
		||||
			ended = true;
 | 
			
		||||
			job.reportProgress(100);
 | 
			
		||||
			if (job.reportProgress) job.reportProgress(100);
 | 
			
		||||
			break;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -83,7 +83,7 @@ export async function exportNotes(job: bq.Job, done: any): Promise<void> {
 | 
			
		|||
			userId: user._id,
 | 
			
		||||
		});
 | 
			
		||||
 | 
			
		||||
		job.reportProgress(exportedNotesCount / total);
 | 
			
		||||
		if (job.reportProgress) job.reportProgress(exportedNotesCount / total);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	await new Promise((res, rej) => {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue