优化队列

This commit is contained in:
ap 2021-12-24 18:33:00 +08:00
parent e06968d20d
commit b23f5a3375
8 changed files with 74 additions and 72 deletions

View File

@ -8,8 +8,8 @@
"@cool-midway/core": "^4.0.15",
"@cool-midway/es": "^4.0.2",
"@cool-midway/oss": "^4.0.3",
"@cool-midway/queue": "^4.0.9",
"@cool-midway/redis": "^4.0.3",
"@cool-midway/queue": "/Users/mac/Documents/src/cool/admin/midway-core/queue/dist/",
"@cool-midway/redis": "^4.0.5",
"@cool-midway/socket": "^4.0.2",
"@cool-midway/wxpay": "^4.0.3",
"@midwayjs/bootstrap": "^2.14.0",

View File

@ -11,8 +11,8 @@ export default (app: Application) => {
// 模块描述
description: '演示示例',
// 中间件
middlewares: ['testMiddleware'],
middlewares: [],
// 全局中间件
globalMiddlewares: ['demoUserMiddleware'],
globalMiddlewares: [],
} as ModuleConfig;
};

View File

@ -1,8 +1,8 @@
import { Get, Inject, Post, Provide } from '@midwayjs/decorator';
import { CoolController, BaseController, CoolUrlTag } from '@cool-midway/core';
import { IQueue } from '@cool-midway/queue';
import { DemoGoodsEntity } from '../../entity/goods';
import { DemoGoodsService } from '../../service/goods';
import { DemoQueue } from '../../queue/demo';
/**
*
@ -30,7 +30,7 @@ export class DemoAppGoodsController extends BaseController {
// 队列
@Inject()
demoQueue: IQueue;
demoQueue: DemoQueue;
/**
*
@ -47,12 +47,7 @@ export class DemoAppGoodsController extends BaseController {
*/
@Post('/queue', { summary: '发送队列数据' })
async queue() {
this.demoQueue.queue.add(
{ a: 1 },
{
removeOnComplete: true,
removeOnFail: true,
}
);
console.log(111, this.demoQueue);
this.demoQueue.add({ a: 2 }, { delay: 2000 });
}
}

View File

@ -1,13 +1,14 @@
import { App, Provide } from '@midwayjs/decorator';
import { App, Provide, Scope, ScopeEnum } from '@midwayjs/decorator';
import { IMidwayWebApplication } from '@midwayjs/web';
import { ICoolQueue, Queue } from '@cool-midway/queue';
import { BaseCoolQueue, Queue } from '@cool-midway/queue';
/**
*
*/
@Queue()
@Scope(ScopeEnum.Singleton)
@Provide()
export abstract class DemoQueue implements ICoolQueue {
export class DemoQueue extends BaseCoolQueue {
@App()
app: IMidwayWebApplication;

View File

@ -1,30 +1,30 @@
import { App, Inject, Provide } from '@midwayjs/decorator';
import { IMidwayWebApplication } from '@midwayjs/web';
import { ICoolQueue, Queue } from '@cool-midway/queue';
import { BaseCoolQueue, Queue } from '@cool-midway/queue';
import { TaskInfoService } from '../service/info';
import { Job } from 'bullmq';
/**
*
*/
@Queue()
@Provide()
export abstract class TaskInfoQueue implements ICoolQueue {
export abstract class TaskInfoQueue extends BaseCoolQueue {
@App()
app: IMidwayWebApplication;
@Inject()
taskInfoService: TaskInfoService;
async data(job: any, done: any): Promise<void> {
async data(job: Job, done: any): Promise<void> {
try {
console.log('收到的数据', job.data);
const result = await this.taskInfoService.invokeService(job.data.service);
this.taskInfoService.record(job.data, 1, JSON.stringify(result));
} catch (error) {
this.taskInfoService.record(job.data, 0, error);
}
this.taskInfoService.updateNextRunTime(job.data.id);
this.taskInfoService.updateStatus();
this.taskInfoService.updateStatus(job.data.id);
done();
}
}

View File

@ -4,11 +4,11 @@ import { InjectEntityModel } from '@midwayjs/orm';
import { Repository } from 'typeorm';
import { TaskInfoEntity } from '../entity/info';
import { TaskLogEntity } from '../entity/log';
import { IQueue } from '@cool-midway/queue';
import { ILogger } from '@midwayjs/logger';
import { IMidwayWebApplication } from '@midwayjs/web';
import * as _ from 'lodash';
import { Utils } from '../../../comm/utils';
import { TaskInfoQueue } from '../queue/task';
/**
*
@ -25,7 +25,7 @@ export class TaskInfoService extends BaseService {
taskLogEntity: Repository<TaskLogEntity>;
@Inject()
taskInfoQueue: IQueue;
taskInfoQueue: TaskInfoQueue;
@App()
app: IMidwayWebApplication;
@ -42,9 +42,7 @@ export class TaskInfoService extends BaseService {
if (task) {
const job = await this.exist(task.id);
if (job) {
await this.taskInfoQueue.queue.removeRepeatable(
JSON.parse(task.repeatConf)
);
await this.taskInfoQueue.removeRepeatable(JSON.parse(task.repeatConf));
}
task.status = 0;
await this.taskInfoEntity.update(task.id, task);
@ -73,8 +71,8 @@ export class TaskInfoService extends BaseService {
async once(id) {
const task = await this.taskInfoEntity.findOne({ id });
if (task) {
await this.taskInfoQueue.queue.add(task, {
jobId: task.id,
await this.taskInfoQueue.add(task, {
jobId: task.id.toString(),
removeOnComplete: true,
removeOnFail: true,
});
@ -86,8 +84,7 @@ export class TaskInfoService extends BaseService {
* @param jobId
*/
async exist(jobId) {
console.log(jobId);
const result = await this.taskInfoQueue.queue.getRepeatableJobs();
const result = await this.taskInfoQueue.getRepeatableJobs();
const ids = result.map(e => {
return e.id;
});
@ -112,26 +109,33 @@ export class TaskInfoService extends BaseService {
if (params.status === 1) {
const exist = await this.exist(params.id);
if (exist) {
await this.taskInfoQueue.queue.removeRepeatable(
await this.taskInfoQueue.removeRepeatable(
JSON.parse(params.repeatConf)
);
}
const jobOp = Object.assign(params);
await this.utils.removeEmptyP(jobOp);
delete jobOp.repeatConf;
const { opts } = await this.taskInfoQueue.queue.add(params, {
const { every, limit, startDate, endDate, cron } = params;
const repeat = {
every,
limit,
jobId: params.id,
startDate,
endDate,
cron,
};
await this.utils.removeEmptyP(repeat);
const result = await this.taskInfoQueue.add(params, {
jobId: params.id,
removeOnComplete: true,
removeOnFail: true,
repeat: jobOp,
repeat,
});
if (!opts) {
throw new Error('任务添加失败,可能由于格式不正确~');
if (!result) {
throw new Error('任务添加失败,请检查任务配置');
}
// await transactionalEntityManager.update(TaskInfoEntity, params.id, {
// jobId: opts.jobId,
// });
repeatConf = opts;
repeatConf = result.opts;
}
});
if (params.status === 1) {
@ -159,9 +163,7 @@ export class TaskInfoService extends BaseService {
const task = await this.taskInfoEntity.findOne({ id });
const exist = await this.exist(task.id);
if (exist) {
await this.taskInfoQueue.queue.removeRepeatable(
JSON.parse(task.repeatConf)
);
await this.taskInfoQueue.removeRepeatable(JSON.parse(task.repeatConf));
}
await this.taskInfoEntity.delete({ id });
await this.taskLogEntity.delete({ taskId: id });
@ -237,12 +239,10 @@ export class TaskInfoService extends BaseService {
*/
async getNextRunTime(jobId) {
let nextRunTime;
const result = await this.taskInfoQueue.queue.getRepeatableJobs();
for (const task of result) {
if (task.id === jobId.toString()) {
const result = await this.taskInfoQueue.getRepeatableJobs();
const task = _.find(result, { id: jobId + '' });
if (task) {
nextRunTime = new Date(task.next);
break;
}
}
return nextRunTime;
}
@ -261,25 +261,21 @@ export class TaskInfoService extends BaseService {
/**
*
*/
async updateStatus() {
const result = await this.taskInfoQueue.queue.getRepeatableJobs();
for (const job of result) {
async updateStatus(jobId) {
const result = await this.taskInfoQueue.getRepeatableJobs();
const job = _.find(result, { id: jobId + '' });
// @ts-ignore
const task = await this.taskInfoEntity.findOne({ id: job.id });
if (task) {
setTimeout(async () => {
// 2秒后清空任务
const nextTime = await this.getNextRunTime(task.id);
if (nextTime && nextTime.getTime() <= new Date().getTime() - 999) {
this.nativeQuery(
'update task_info a set a.status = ?, a.updateTime = ? where a.id = ?',
[0, new Date(), task.id]
);
this.taskInfoQueue.queue.removeRepeatable(
JSON.parse(task.repeatConf)
);
}
}, 2000);
if (task) {
if (task.nextRunTime.getTime() == nextTime.getTime()) {
task.status = 0;
task.nextRunTime = nextTime;
this.taskInfoQueue.removeRepeatableByKey(job.key);
} else {
task.nextRunTime = nextTime;
}
await this.taskInfoEntity.update(task.id, task);
}
}

View File

@ -10,12 +10,12 @@ export default (appInfo: EggAppInfo) => {
host: '127.0.0.1',
port: 3306,
username: 'root',
password: '123456',
password: '123123',
database: 'cool',
// 自动建表 注意:线上部署的时候不要使用,有可能导致数据丢失
synchronize: true,
// 打印日志
logging: true,
logging: false,
// 字符集
charset: 'utf8mb4',
// 驱动
@ -30,5 +30,15 @@ export default (appInfo: EggAppInfo) => {
},
};
config.cool = {
// redis为插件名称
redis: {
host: '127.0.0.1',
password: '',
port: 6379,
db: 0,
},
};
return config;
};

View File

@ -6,8 +6,8 @@ import * as orm from '@midwayjs/orm';
import * as cool from '@cool-midway/core';
// import * as wxpay from '@cool-midway/wxpay';
import * as oss from '@cool-midway/oss';
// import * as redis from '@cool-midway/redis';
// import * as queue from '@cool-midway/queue';
import * as redis from '@cool-midway/redis';
import * as queue from '@cool-midway/queue';
// import * as alipay from '@cool-midway/alipay';
// import * as socket from '@cool-midway/socket';
@ -23,9 +23,9 @@ import * as oss from '@cool-midway/oss';
// oss插件需要到后台配置之后才有用默认是本地上传
oss,
// 将缓存替换成redis
//redis,
redis,
// 队列
//queue,
queue,
// 微信支付
//wxpay,
// 支付宝支付