优化任务调度

This commit is contained in:
小鹏 2022-07-27 17:14:52 +08:00
parent ecb34fc134
commit adf21bbe09

View File

@ -1,5 +1,6 @@
import { import {
App, App,
Init,
Inject, Inject,
Logger, Logger,
Provide, Provide,
@ -59,6 +60,16 @@ export class TaskInfoService extends BaseService {
} }
} }
/**
*
* @param taskId
*/
async remove(taskId) {
const result = await this.taskInfoQueue.getRepeatableJobs();
const job = _.find(result, { id: taskId + '' });
await this.taskInfoQueue.removeRepeatableByKey(job.key);
}
/** /**
* *
* @param id * @param id
@ -67,7 +78,7 @@ export class TaskInfoService extends BaseService {
async start(id, type?) { async start(id, type?) {
const task = await this.taskInfoEntity.findOne({ id }); const task = await this.taskInfoEntity.findOne({ id });
task.status = 1; task.status = 1;
if (type) { if (type || type == 0) {
task.type = type; task.type = type;
} }
await this.addOrUpdate(task); await this.addOrUpdate(task);
@ -111,6 +122,7 @@ export class TaskInfoService extends BaseService {
* @param params * @param params
*/ */
async addOrUpdate(params) { async addOrUpdate(params) {
delete params.repeatCount;
let repeatConf; let repeatConf;
await this.getOrmManager().transaction(async transactionalEntityManager => { await this.getOrmManager().transaction(async transactionalEntityManager => {
if (params.taskType === 0) { if (params.taskType === 0) {
@ -124,7 +136,7 @@ export class TaskInfoService extends BaseService {
if (params.status === 1) { if (params.status === 1) {
const exist = await this.exist(params.id); const exist = await this.exist(params.id);
if (exist) { if (exist) {
this.stop(params.id); await this.remove(params.id);
} }
const { every, limit, startDate, endDate, cron } = params; const { every, limit, startDate, endDate, cron } = params;
const repeat = { const repeat = {
@ -146,7 +158,8 @@ export class TaskInfoService extends BaseService {
throw new Error('任务添加失败,请检查任务配置'); throw new Error('任务添加失败,请检查任务配置');
} }
// await transactionalEntityManager.update(TaskInfoEntity, params.id, { // await transactionalEntityManager.update(TaskInfoEntity, params.id, {
// jobId: opts.jobId, // jobId: params.id,
// type: params.type,
// }); // });
repeatConf = result.opts; repeatConf = result.opts;
} }
@ -233,17 +246,20 @@ export class TaskInfoService extends BaseService {
/** /**
* *
*/ */
@Init()
async initTask() { async initTask() {
const runningTasks = await this.taskInfoEntity.find({ status: 1 }); setTimeout(async () => {
if (!_.isEmpty(runningTasks)) { const runningTasks = await this.taskInfoEntity.find({ status: 1 });
for (const task of runningTasks) { if (!_.isEmpty(runningTasks)) {
const job = await this.exist(task.id); // 任务已存在就不添加 for (const task of runningTasks) {
if (!job) { const job = await this.exist(task.id); // 任务已存在就不添加
this.logger.info(`init task ${task.name}`); if (!job) {
await this.addOrUpdate(task); this.logger.info(`init task ${task.name}`);
await this.addOrUpdate(task);
}
} }
} }
} }, 3000);
} }
/** /**
@ -290,6 +306,9 @@ export class TaskInfoService extends BaseService {
async updateStatus(jobId) { async updateStatus(jobId) {
const result = await this.taskInfoQueue.getRepeatableJobs(); const result = await this.taskInfoQueue.getRepeatableJobs();
const job = _.find(result, { id: jobId + '' }); const job = _.find(result, { id: jobId + '' });
if (!job) {
return;
}
// @ts-ignore // @ts-ignore
const task = await this.taskInfoEntity.findOne({ id: job.id }); const task = await this.taskInfoEntity.findOne({ id: job.id });
const nextTime = await this.getNextRunTime(task.id); const nextTime = await this.getNextRunTime(task.id);