fix bullmq type

This commit is contained in:
xiaopeng 2025-02-20 00:20:39 +08:00
parent e2cda6aa5a
commit 97e29ddc7f
4 changed files with 43 additions and 45 deletions

View File

@ -2,8 +2,8 @@
"task_info": [
{
"id": 1,
"jobId": "089f554c-fdd4-4093-9f84-4cfb6af2f514",
"repeatConf": "{\"count\":1,\"type\":1,\"limit\":5,\"name\":\"每秒执行,总共5次\",\"taskType\":1,\"every\":1000,\"service\":\"taskDemoService.test()\",\"status\":1,\"id\":1,\"createTime\":\"2021-03-10 14:25:13\",\"updateTime\":\"2021-03-10 14:25:13\",\"jobId\":1}",
"jobId": null,
"repeatConf": null,
"name": "每秒执行一次",
"cron": null,
"limit": null,
@ -15,13 +15,13 @@
"data": null,
"service": "taskDemoService.test(1,2)",
"type": 1,
"nextRunTime": "2021-3-10 14:25:18",
"nextRunTime": null,
"taskType": 1
},
{
"id": 2,
"jobId": "9e1f42c8-b127-449b-b0a4-d53c60b79e75",
"repeatConf": "{\"count\":1,\"id\":2,\"createTime\":\"2021-03-10 14:25:53\",\"updateTime\":\"2021-03-10 14:25:55\",\"name\":\"cron任务5秒执行一次\",\"cron\":\"0/5 * * * * ? \",\"status\":1,\"service\":\"taskDemoService.test()\",\"type\":1,\"nextRunTime\":\"2021-03-10 14:26:00\",\"taskType\":0,\"jobId\":2}",
"jobId": null,
"repeatConf": null,
"name": "cron任务5秒执行一次",
"cron": "0/5 * * * * * ",
"limit": null,

View File

@ -17,8 +17,8 @@ import * as _ from 'lodash';
import { Utils } from '../../../comm/utils';
import { TaskInfoQueue } from '../queue/task';
import { IMidwayApplication } from '@midwayjs/core';
import { v4 as uuidv4 } from 'uuid';
import * as moment from 'moment';
/**
*
*/
@ -55,7 +55,7 @@ export class TaskBullService extends BaseService {
if (task) {
const result = await this.taskInfoQueue.getJobSchedulers();
const job = _.find(result, e => {
return e.template?.data?.jobId === task.jobId;
return e.key == task.jobId;
});
if (job) {
await this.taskInfoQueue.removeJobScheduler(job.key);
@ -72,7 +72,7 @@ export class TaskBullService extends BaseService {
async remove(taskId) {
const info = await this.taskInfoEntity.findOneBy({ id: Equal(taskId) });
const result = await this.taskInfoQueue.getJobSchedulers();
const job = _.find(result, { id: info?.jobId });
const job = _.find(result, { key: info?.jobId });
if (job) {
await this.taskInfoQueue.removeJobScheduler(job.key);
}
@ -116,11 +116,14 @@ export class TaskBullService extends BaseService {
*/
async exist(jobId) {
const info = await this.taskInfoEntity.findOneBy({ jobId: Equal(jobId) });
if (!info) {
return false;
}
const result = await this.taskInfoQueue.getJobSchedulers();
const ids = result.map(e => {
return e.id;
const job = _.find(result, e => {
return e.key == info.jobId;
});
return ids.includes(info?.jobId);
return !!job;
}
/**
*
@ -128,10 +131,7 @@ export class TaskBullService extends BaseService {
*/
async addOrUpdate(params) {
delete params.repeatCount;
let repeatConf;
if (!params.jobId) {
params.jobId = uuidv4();
}
let repeatConf, jobId;
await this.getOrmManager().transaction(async transactionalEntityManager => {
if (params.taskType === 0) {
params.limit = null;
@ -161,21 +161,19 @@ export class TaskBullService extends BaseService {
removeOnFail: true,
repeat,
});
if (!result) {
if (!result?.repeatJobKey) {
throw new Error('任务添加失败,请检查任务配置');
}
// await transactionalEntityManager.update(TaskInfoEntity, params.id, {
// jobId: params.id,
// type: params.type,
// });
jobId = result.repeatJobKey;
repeatConf = result.opts;
}
});
if (params.status === 1) {
this.utils.sleep(1000);
await this.updateNextRunTime(params.jobId);
await this.taskInfoEntity.update(params.id, {
repeatConf: JSON.stringify(repeatConf.repeat),
status: 1,
jobId,
});
}
}
@ -209,8 +207,11 @@ export class TaskBullService extends BaseService {
*/
async record(task, status, detail?) {
const info = await this.taskInfoEntity.findOneBy({
jobId: Equal(task.jobId),
id: Equal(task.id),
});
if (!info) {
return;
}
await this.taskLogEntity.save({
taskId: info.id,
status,
@ -249,7 +250,7 @@ export class TaskBullService extends BaseService {
let nextRunTime;
const result = await this.taskInfoQueue.getJobSchedulers();
const task = _.find(result, e => {
return e.template?.data?.jobId === jobId;
return e.key === jobId;
});
if (task) {
nextRunTime = new Date(task.next);
@ -261,10 +262,14 @@ export class TaskBullService extends BaseService {
* @param jobId
*/
async updateNextRunTime(jobId) {
const nextRunTime = await this.getNextRunTime(jobId);
if (!nextRunTime) {
return;
}
await this.taskInfoEntity.update(
{ jobId },
{
nextRunTime: await this.getNextRunTime(jobId),
nextRunTime,
}
);
}
@ -283,22 +288,19 @@ export class TaskBullService extends BaseService {
/**
*
*/
async updateStatus(jobId) {
async updateStatus(jobId: number) {
const task = await this.taskInfoEntity.findOneBy({ id: jobId });
if (!task) {
return;
}
const result = await this.taskInfoQueue.getJobSchedulers();
const job = _.find(result, { id: jobId + '' });
const job = _.find(result, { key: task.jobId });
if (!job) {
return;
}
const task = await this.taskInfoEntity.findOneBy({ id: job.id });
const nextTime = await this.getNextRunTime(task.jobId);
if (task) {
// if (task.nextRunTime.getTime() == nextTime.getTime()) {
// task.status = 0;
// task.nextRunTime = nextTime;
// this.taskInfoQueue.removeRepeatableByKey(job.key);
// } else {
task.nextRunTime = nextTime;
// }
await this.taskInfoEntity.update(task.id, task);
}
}

View File

@ -64,8 +64,8 @@ export class TaskInfoService extends BaseService {
*/
async stop(id) {
this.type === 'bull'
? this.taskBullService.stop(id)
: this.taskLocalService.stop(id);
? await this.taskBullService.stop(id)
: await this.taskLocalService.stop(id);
}
/**
@ -75,8 +75,8 @@ export class TaskInfoService extends BaseService {
*/
async start(id, type?) {
this.type === 'bull'
? this.taskBullService.start(id)
: this.taskLocalService.start(id, type);
? await this.taskBullService.start(id)
: await this.taskLocalService.start(id, type);
}
/**
*

View File

@ -165,16 +165,12 @@ export class TaskLocalService extends BaseService {
this.cronJobs.delete(params.jobId);
this.coolEventManager.emit('onLocalTaskStop', params.jobId);
}
setTimeout(async () => {
this.createCronJob(params);
}, 1000);
this.createCronJob(params);
}
});
if (params.status === 1) {
setTimeout(async () => {
await this.updateNextRunTime(params.jobId);
}, 1000);
await this.updateNextRunTime(params.jobId);
}
}
@ -315,8 +311,8 @@ export class TaskLocalService extends BaseService {
return;
}
try {
const currentTime = moment();
const lockExpireTime = moment().add(5, 'minutes');
const currentTime = moment().toDate();
const lockExpireTime = moment().add(5, 'minutes').toDate();
const result = await this.taskInfoEntity
.createQueryBuilder()
.update()