From b23f5a33750b405c64187ab5a6108e454383e22d Mon Sep 17 00:00:00 2001 From: ap <951984189@qq.com> Date: Fri, 24 Dec 2021 18:33:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 4 +- src/app/modules/demo/config.ts | 4 +- src/app/modules/demo/controller/app/goods.ts | 13 +-- src/app/modules/demo/queue/demo.ts | 7 +- src/app/modules/task/queue/task.ts | 10 +-- src/app/modules/task/service/info.ts | 86 ++++++++++---------- src/config/config.local.ts | 14 +++- src/configuration.ts | 8 +- 8 files changed, 74 insertions(+), 72 deletions(-) diff --git a/package.json b/package.json index 090ad6c..85d16ff 100755 --- a/package.json +++ b/package.json @@ -8,8 +8,8 @@ "@cool-midway/core": "^4.0.15", "@cool-midway/es": "^4.0.2", "@cool-midway/oss": "^4.0.3", - "@cool-midway/queue": "^4.0.9", - "@cool-midway/redis": "^4.0.3", + "@cool-midway/queue": "/Users/mac/Documents/src/cool/admin/midway-core/queue/dist/", + "@cool-midway/redis": "^4.0.5", "@cool-midway/socket": "^4.0.2", "@cool-midway/wxpay": "^4.0.3", "@midwayjs/bootstrap": "^2.14.0", diff --git a/src/app/modules/demo/config.ts b/src/app/modules/demo/config.ts index 0511205..0daa6cf 100644 --- a/src/app/modules/demo/config.ts +++ b/src/app/modules/demo/config.ts @@ -11,8 +11,8 @@ export default (app: Application) => { // 模块描述 description: '演示示例', // 中间件 - middlewares: ['testMiddleware'], + middlewares: [], // 全局中间件 - globalMiddlewares: ['demoUserMiddleware'], + globalMiddlewares: [], } as ModuleConfig; }; diff --git a/src/app/modules/demo/controller/app/goods.ts b/src/app/modules/demo/controller/app/goods.ts index 264fc2a..b5d14cc 100644 --- a/src/app/modules/demo/controller/app/goods.ts +++ b/src/app/modules/demo/controller/app/goods.ts @@ -1,8 +1,8 @@ import { Get, Inject, Post, Provide } from '@midwayjs/decorator'; import { CoolController, BaseController, CoolUrlTag } from '@cool-midway/core'; -import { IQueue } from '@cool-midway/queue'; import { DemoGoodsEntity } from '../../entity/goods'; import { DemoGoodsService } from '../../service/goods'; +import { DemoQueue } from '../../queue/demo'; /** * 商品 @@ -30,7 +30,7 @@ export class DemoAppGoodsController extends BaseController { // 队列 @Inject() - demoQueue: IQueue; + demoQueue: DemoQueue; /** * 请求所有数据 @@ -47,12 +47,7 @@ export class DemoAppGoodsController extends BaseController { */ @Post('/queue', { summary: '发送队列数据' }) async queue() { - this.demoQueue.queue.add( - { a: 1 }, - { - removeOnComplete: true, - removeOnFail: true, - } - ); + console.log(111, this.demoQueue); + this.demoQueue.add({ a: 2 }, { delay: 2000 }); } } diff --git a/src/app/modules/demo/queue/demo.ts b/src/app/modules/demo/queue/demo.ts index d683acf..6a97877 100644 --- a/src/app/modules/demo/queue/demo.ts +++ b/src/app/modules/demo/queue/demo.ts @@ -1,13 +1,14 @@ -import { App, Provide } from '@midwayjs/decorator'; +import { App, Provide, Scope, ScopeEnum } from '@midwayjs/decorator'; import { IMidwayWebApplication } from '@midwayjs/web'; -import { ICoolQueue, Queue } from '@cool-midway/queue'; +import { BaseCoolQueue, Queue } from '@cool-midway/queue'; /** * 任务 */ @Queue() +@Scope(ScopeEnum.Singleton) @Provide() -export abstract class DemoQueue implements ICoolQueue { +export class DemoQueue extends BaseCoolQueue { @App() app: IMidwayWebApplication; diff --git a/src/app/modules/task/queue/task.ts b/src/app/modules/task/queue/task.ts index 25c9857..37f266d 100644 --- a/src/app/modules/task/queue/task.ts +++ b/src/app/modules/task/queue/task.ts @@ -1,30 +1,30 @@ import { App, Inject, Provide } from '@midwayjs/decorator'; import { IMidwayWebApplication } from '@midwayjs/web'; -import { ICoolQueue, Queue } from '@cool-midway/queue'; +import { BaseCoolQueue, Queue } from '@cool-midway/queue'; import { TaskInfoService } from '../service/info'; +import { Job } from 'bullmq'; /** * 任务 */ @Queue() @Provide() -export abstract class TaskInfoQueue implements ICoolQueue { +export abstract class TaskInfoQueue extends BaseCoolQueue { @App() app: IMidwayWebApplication; @Inject() taskInfoService: TaskInfoService; - async data(job: any, done: any): Promise { + async data(job: Job, done: any): Promise { try { - console.log('收到的数据', job.data); const result = await this.taskInfoService.invokeService(job.data.service); this.taskInfoService.record(job.data, 1, JSON.stringify(result)); } catch (error) { this.taskInfoService.record(job.data, 0, error); } this.taskInfoService.updateNextRunTime(job.data.id); - this.taskInfoService.updateStatus(); + this.taskInfoService.updateStatus(job.data.id); done(); } } diff --git a/src/app/modules/task/service/info.ts b/src/app/modules/task/service/info.ts index c084e92..9a689c7 100644 --- a/src/app/modules/task/service/info.ts +++ b/src/app/modules/task/service/info.ts @@ -4,11 +4,11 @@ import { InjectEntityModel } from '@midwayjs/orm'; import { Repository } from 'typeorm'; import { TaskInfoEntity } from '../entity/info'; import { TaskLogEntity } from '../entity/log'; -import { IQueue } from '@cool-midway/queue'; import { ILogger } from '@midwayjs/logger'; import { IMidwayWebApplication } from '@midwayjs/web'; import * as _ from 'lodash'; import { Utils } from '../../../comm/utils'; +import { TaskInfoQueue } from '../queue/task'; /** * 任务 @@ -25,7 +25,7 @@ export class TaskInfoService extends BaseService { taskLogEntity: Repository; @Inject() - taskInfoQueue: IQueue; + taskInfoQueue: TaskInfoQueue; @App() app: IMidwayWebApplication; @@ -42,9 +42,7 @@ export class TaskInfoService extends BaseService { if (task) { const job = await this.exist(task.id); if (job) { - await this.taskInfoQueue.queue.removeRepeatable( - JSON.parse(task.repeatConf) - ); + await this.taskInfoQueue.removeRepeatable(JSON.parse(task.repeatConf)); } task.status = 0; await this.taskInfoEntity.update(task.id, task); @@ -73,8 +71,8 @@ export class TaskInfoService extends BaseService { async once(id) { const task = await this.taskInfoEntity.findOne({ id }); if (task) { - await this.taskInfoQueue.queue.add(task, { - jobId: task.id, + await this.taskInfoQueue.add(task, { + jobId: task.id.toString(), removeOnComplete: true, removeOnFail: true, }); @@ -86,8 +84,7 @@ export class TaskInfoService extends BaseService { * @param jobId */ async exist(jobId) { - console.log(jobId); - const result = await this.taskInfoQueue.queue.getRepeatableJobs(); + const result = await this.taskInfoQueue.getRepeatableJobs(); const ids = result.map(e => { return e.id; }); @@ -112,26 +109,33 @@ export class TaskInfoService extends BaseService { if (params.status === 1) { const exist = await this.exist(params.id); if (exist) { - await this.taskInfoQueue.queue.removeRepeatable( + await this.taskInfoQueue.removeRepeatable( JSON.parse(params.repeatConf) ); } - const jobOp = Object.assign(params); - await this.utils.removeEmptyP(jobOp); - delete jobOp.repeatConf; - const { opts } = await this.taskInfoQueue.queue.add(params, { + const { every, limit, startDate, endDate, cron } = params; + const repeat = { + every, + limit, + jobId: params.id, + startDate, + endDate, + cron, + }; + await this.utils.removeEmptyP(repeat); + const result = await this.taskInfoQueue.add(params, { jobId: params.id, removeOnComplete: true, removeOnFail: true, - repeat: jobOp, + repeat, }); - if (!opts) { - throw new Error('任务添加失败,可能由于格式不正确~'); + if (!result) { + throw new Error('任务添加失败,请检查任务配置'); } // await transactionalEntityManager.update(TaskInfoEntity, params.id, { // jobId: opts.jobId, // }); - repeatConf = opts; + repeatConf = result.opts; } }); if (params.status === 1) { @@ -159,9 +163,7 @@ export class TaskInfoService extends BaseService { const task = await this.taskInfoEntity.findOne({ id }); const exist = await this.exist(task.id); if (exist) { - await this.taskInfoQueue.queue.removeRepeatable( - JSON.parse(task.repeatConf) - ); + await this.taskInfoQueue.removeRepeatable(JSON.parse(task.repeatConf)); } await this.taskInfoEntity.delete({ id }); await this.taskLogEntity.delete({ taskId: id }); @@ -237,12 +239,10 @@ export class TaskInfoService extends BaseService { */ async getNextRunTime(jobId) { let nextRunTime; - const result = await this.taskInfoQueue.queue.getRepeatableJobs(); - for (const task of result) { - if (task.id === jobId.toString()) { - nextRunTime = new Date(task.next); - break; - } + const result = await this.taskInfoQueue.getRepeatableJobs(); + const task = _.find(result, { id: jobId + '' }); + if (task) { + nextRunTime = new Date(task.next); } return nextRunTime; } @@ -261,25 +261,21 @@ export class TaskInfoService extends BaseService { /** * 刷新任务状态 */ - async updateStatus() { - const result = await this.taskInfoQueue.queue.getRepeatableJobs(); - for (const job of result) { - const task = await this.taskInfoEntity.findOne({ id: job.id }); - if (task) { - setTimeout(async () => { - // 2秒后清空任务 - const nextTime = await this.getNextRunTime(task.id); - if (nextTime && nextTime.getTime() <= new Date().getTime() - 999) { - this.nativeQuery( - 'update task_info a set a.status = ?, a.updateTime = ? where a.id = ?', - [0, new Date(), task.id] - ); - this.taskInfoQueue.queue.removeRepeatable( - JSON.parse(task.repeatConf) - ); - } - }, 2000); + async updateStatus(jobId) { + const result = await this.taskInfoQueue.getRepeatableJobs(); + const job = _.find(result, { id: jobId + '' }); + // @ts-ignore + const task = await this.taskInfoEntity.findOne({ id: job.id }); + const nextTime = await this.getNextRunTime(task.id); + if (task) { + if (task.nextRunTime.getTime() == nextTime.getTime()) { + task.status = 0; + task.nextRunTime = nextTime; + this.taskInfoQueue.removeRepeatableByKey(job.key); + } else { + task.nextRunTime = nextTime; } + await this.taskInfoEntity.update(task.id, task); } } diff --git a/src/config/config.local.ts b/src/config/config.local.ts index 2457171..e9afc6e 100644 --- a/src/config/config.local.ts +++ b/src/config/config.local.ts @@ -10,12 +10,12 @@ export default (appInfo: EggAppInfo) => { host: '127.0.0.1', port: 3306, username: 'root', - password: '123456', + password: '123123', database: 'cool', // 自动建表 注意:线上部署的时候不要使用,有可能导致数据丢失 synchronize: true, // 打印日志 - logging: true, + logging: false, // 字符集 charset: 'utf8mb4', // 驱动 @@ -30,5 +30,15 @@ export default (appInfo: EggAppInfo) => { }, }; + config.cool = { + // redis为插件名称 + redis: { + host: '127.0.0.1', + password: '', + port: 6379, + db: 0, + }, + }; + return config; }; diff --git a/src/configuration.ts b/src/configuration.ts index 8346f47..9e164c9 100644 --- a/src/configuration.ts +++ b/src/configuration.ts @@ -6,8 +6,8 @@ import * as orm from '@midwayjs/orm'; import * as cool from '@cool-midway/core'; // import * as wxpay from '@cool-midway/wxpay'; import * as oss from '@cool-midway/oss'; -// import * as redis from '@cool-midway/redis'; -// import * as queue from '@cool-midway/queue'; +import * as redis from '@cool-midway/redis'; +import * as queue from '@cool-midway/queue'; // import * as alipay from '@cool-midway/alipay'; // import * as socket from '@cool-midway/socket'; @@ -23,9 +23,9 @@ import * as oss from '@cool-midway/oss'; // oss插件,需要到后台配置之后才有用,默认是本地上传 oss, // 将缓存替换成redis - //redis, + redis, // 队列 - //queue, + queue, // 微信支付 //wxpay, // 支付宝支付