diff --git a/src/modules/task/db.json b/src/modules/task/db.json index d293962..43d95a0 100644 --- a/src/modules/task/db.json +++ b/src/modules/task/db.json @@ -2,8 +2,8 @@ "task_info": [ { "id": 1, - "jobId": "089f554c-fdd4-4093-9f84-4cfb6af2f514", - "repeatConf": "{\"count\":1,\"type\":1,\"limit\":5,\"name\":\"每秒执行,总共5次\",\"taskType\":1,\"every\":1000,\"service\":\"taskDemoService.test()\",\"status\":1,\"id\":1,\"createTime\":\"2021-03-10 14:25:13\",\"updateTime\":\"2021-03-10 14:25:13\",\"jobId\":1}", + "jobId": null, + "repeatConf": null, "name": "每秒执行一次", "cron": null, "limit": null, @@ -15,13 +15,13 @@ "data": null, "service": "taskDemoService.test(1,2)", "type": 1, - "nextRunTime": "2021-3-10 14:25:18", + "nextRunTime": null, "taskType": 1 }, { "id": 2, - "jobId": "9e1f42c8-b127-449b-b0a4-d53c60b79e75", - "repeatConf": "{\"count\":1,\"id\":2,\"createTime\":\"2021-03-10 14:25:53\",\"updateTime\":\"2021-03-10 14:25:55\",\"name\":\"cron任务,5秒执行一次\",\"cron\":\"0/5 * * * * ? \",\"status\":1,\"service\":\"taskDemoService.test()\",\"type\":1,\"nextRunTime\":\"2021-03-10 14:26:00\",\"taskType\":0,\"jobId\":2}", + "jobId": null, + "repeatConf": null, "name": "cron任务,5秒执行一次", "cron": "0/5 * * * * * ", "limit": null, diff --git a/src/modules/task/service/bull.ts b/src/modules/task/service/bull.ts index 4ffc4fb..acf386f 100644 --- a/src/modules/task/service/bull.ts +++ b/src/modules/task/service/bull.ts @@ -17,8 +17,8 @@ import * as _ from 'lodash'; import { Utils } from '../../../comm/utils'; import { TaskInfoQueue } from '../queue/task'; import { IMidwayApplication } from '@midwayjs/core'; -import { v4 as uuidv4 } from 'uuid'; import * as moment from 'moment'; + /** * 任务 */ @@ -55,7 +55,7 @@ export class TaskBullService extends BaseService { if (task) { const result = await this.taskInfoQueue.getJobSchedulers(); const job = _.find(result, e => { - return e.template?.data?.jobId === task.jobId; + return e.key == task.jobId; }); if (job) { await this.taskInfoQueue.removeJobScheduler(job.key); @@ -72,7 +72,7 @@ export class TaskBullService extends BaseService { async remove(taskId) { const info = await this.taskInfoEntity.findOneBy({ id: Equal(taskId) }); const result = await this.taskInfoQueue.getJobSchedulers(); - const job = _.find(result, { id: info?.jobId }); + const job = _.find(result, { key: info?.jobId }); if (job) { await this.taskInfoQueue.removeJobScheduler(job.key); } @@ -116,11 +116,14 @@ export class TaskBullService extends BaseService { */ async exist(jobId) { const info = await this.taskInfoEntity.findOneBy({ jobId: Equal(jobId) }); + if (!info) { + return false; + } const result = await this.taskInfoQueue.getJobSchedulers(); - const ids = result.map(e => { - return e.id; + const job = _.find(result, e => { + return e.key == info.jobId; }); - return ids.includes(info?.jobId); + return !!job; } /** * 新增或修改 @@ -128,10 +131,7 @@ export class TaskBullService extends BaseService { */ async addOrUpdate(params) { delete params.repeatCount; - let repeatConf; - if (!params.jobId) { - params.jobId = uuidv4(); - } + let repeatConf, jobId; await this.getOrmManager().transaction(async transactionalEntityManager => { if (params.taskType === 0) { params.limit = null; @@ -161,21 +161,19 @@ export class TaskBullService extends BaseService { removeOnFail: true, repeat, }); - if (!result) { + if (!result?.repeatJobKey) { throw new Error('任务添加失败,请检查任务配置'); } - // await transactionalEntityManager.update(TaskInfoEntity, params.id, { - // jobId: params.id, - // type: params.type, - // }); + jobId = result.repeatJobKey; repeatConf = result.opts; } }); if (params.status === 1) { - this.utils.sleep(1000); await this.updateNextRunTime(params.jobId); await this.taskInfoEntity.update(params.id, { repeatConf: JSON.stringify(repeatConf.repeat), + status: 1, + jobId, }); } } @@ -209,8 +207,11 @@ export class TaskBullService extends BaseService { */ async record(task, status, detail?) { const info = await this.taskInfoEntity.findOneBy({ - jobId: Equal(task.jobId), + id: Equal(task.id), }); + if (!info) { + return; + } await this.taskLogEntity.save({ taskId: info.id, status, @@ -249,7 +250,7 @@ export class TaskBullService extends BaseService { let nextRunTime; const result = await this.taskInfoQueue.getJobSchedulers(); const task = _.find(result, e => { - return e.template?.data?.jobId === jobId; + return e.key === jobId; }); if (task) { nextRunTime = new Date(task.next); @@ -261,10 +262,14 @@ export class TaskBullService extends BaseService { * @param jobId */ async updateNextRunTime(jobId) { + const nextRunTime = await this.getNextRunTime(jobId); + if (!nextRunTime) { + return; + } await this.taskInfoEntity.update( { jobId }, { - nextRunTime: await this.getNextRunTime(jobId), + nextRunTime, } ); } @@ -283,22 +288,19 @@ export class TaskBullService extends BaseService { /** * 刷新任务状态 */ - async updateStatus(jobId) { + async updateStatus(jobId: number) { + const task = await this.taskInfoEntity.findOneBy({ id: jobId }); + if (!task) { + return; + } const result = await this.taskInfoQueue.getJobSchedulers(); - const job = _.find(result, { id: jobId + '' }); + const job = _.find(result, { key: task.jobId }); if (!job) { return; } - const task = await this.taskInfoEntity.findOneBy({ id: job.id }); const nextTime = await this.getNextRunTime(task.jobId); if (task) { - // if (task.nextRunTime.getTime() == nextTime.getTime()) { - // task.status = 0; - // task.nextRunTime = nextTime; - // this.taskInfoQueue.removeRepeatableByKey(job.key); - // } else { task.nextRunTime = nextTime; - // } await this.taskInfoEntity.update(task.id, task); } } diff --git a/src/modules/task/service/info.ts b/src/modules/task/service/info.ts index b1acd64..d11cf0c 100644 --- a/src/modules/task/service/info.ts +++ b/src/modules/task/service/info.ts @@ -64,8 +64,8 @@ export class TaskInfoService extends BaseService { */ async stop(id) { this.type === 'bull' - ? this.taskBullService.stop(id) - : this.taskLocalService.stop(id); + ? await this.taskBullService.stop(id) + : await this.taskLocalService.stop(id); } /** @@ -75,8 +75,8 @@ export class TaskInfoService extends BaseService { */ async start(id, type?) { this.type === 'bull' - ? this.taskBullService.start(id) - : this.taskLocalService.start(id, type); + ? await this.taskBullService.start(id) + : await this.taskLocalService.start(id, type); } /** * 手动执行一次 diff --git a/src/modules/task/service/local.ts b/src/modules/task/service/local.ts index 286bfd4..55e1d2a 100644 --- a/src/modules/task/service/local.ts +++ b/src/modules/task/service/local.ts @@ -165,16 +165,12 @@ export class TaskLocalService extends BaseService { this.cronJobs.delete(params.jobId); this.coolEventManager.emit('onLocalTaskStop', params.jobId); } - setTimeout(async () => { - this.createCronJob(params); - }, 1000); + this.createCronJob(params); } }); if (params.status === 1) { - setTimeout(async () => { - await this.updateNextRunTime(params.jobId); - }, 1000); + await this.updateNextRunTime(params.jobId); } } @@ -315,8 +311,8 @@ export class TaskLocalService extends BaseService { return; } try { - const currentTime = moment(); - const lockExpireTime = moment().add(5, 'minutes'); + const currentTime = moment().toDate(); + const lockExpireTime = moment().add(5, 'minutes').toDate(); const result = await this.taskInfoEntity .createQueryBuilder() .update()