diff --git a/src/modules/task/service/info.ts b/src/modules/task/service/info.ts index 91e6833..56611ac 100644 --- a/src/modules/task/service/info.ts +++ b/src/modules/task/service/info.ts @@ -1,5 +1,6 @@ import { App, + Init, Inject, Logger, Provide, @@ -59,6 +60,16 @@ export class TaskInfoService extends BaseService { } } + /** + * 移除任务 + * @param taskId + */ + async remove(taskId) { + const result = await this.taskInfoQueue.getRepeatableJobs(); + const job = _.find(result, { id: taskId + '' }); + await this.taskInfoQueue.removeRepeatableByKey(job.key); + } + /** * 开始任务 * @param id @@ -67,7 +78,7 @@ export class TaskInfoService extends BaseService { async start(id, type?) { const task = await this.taskInfoEntity.findOne({ id }); task.status = 1; - if (type) { + if (type || type == 0) { task.type = type; } await this.addOrUpdate(task); @@ -111,6 +122,7 @@ export class TaskInfoService extends BaseService { * @param params */ async addOrUpdate(params) { + delete params.repeatCount; let repeatConf; await this.getOrmManager().transaction(async transactionalEntityManager => { if (params.taskType === 0) { @@ -124,7 +136,7 @@ export class TaskInfoService extends BaseService { if (params.status === 1) { const exist = await this.exist(params.id); if (exist) { - this.stop(params.id); + await this.remove(params.id); } const { every, limit, startDate, endDate, cron } = params; const repeat = { @@ -146,7 +158,8 @@ export class TaskInfoService extends BaseService { throw new Error('任务添加失败,请检查任务配置'); } // await transactionalEntityManager.update(TaskInfoEntity, params.id, { - // jobId: opts.jobId, + // jobId: params.id, + // type: params.type, // }); repeatConf = result.opts; } @@ -233,17 +246,20 @@ export class TaskInfoService extends BaseService { /** * 初始化任务 */ + @Init() async initTask() { - const runningTasks = await this.taskInfoEntity.find({ status: 1 }); - if (!_.isEmpty(runningTasks)) { - for (const task of runningTasks) { - const job = await this.exist(task.id); // 任务已存在就不添加 - if (!job) { - this.logger.info(`init task ${task.name}`); - await this.addOrUpdate(task); + setTimeout(async () => { + const runningTasks = await this.taskInfoEntity.find({ status: 1 }); + if (!_.isEmpty(runningTasks)) { + for (const task of runningTasks) { + const job = await this.exist(task.id); // 任务已存在就不添加 + if (!job) { + this.logger.info(`init task ${task.name}`); + await this.addOrUpdate(task); + } } } - } + }, 3000); } /** @@ -290,6 +306,9 @@ export class TaskInfoService extends BaseService { async updateStatus(jobId) { const result = await this.taskInfoQueue.getRepeatableJobs(); const job = _.find(result, { id: jobId + '' }); + if (!job) { + return; + } // @ts-ignore const task = await this.taskInfoEntity.findOne({ id: job.id }); const nextTime = await this.getNextRunTime(task.id);