完善队列,新增主动消费

This commit is contained in:
ap 2021-12-31 15:30:12 +08:00
parent b23f5a3375
commit 28f624420a
6 changed files with 61 additions and 23 deletions

View File

@ -2,15 +2,16 @@
"queue": {
"prefix": "queue",
"body": [
"import { Provide } from '@midwayjs/decorator';",
"import { ICoolQueue, Queue } from '@cool-midway/queue';",
"import { Provide, Scope, ScopeEnum } from '@midwayjs/decorator';",
"import { BaseCoolQueue, Queue } from '@cool-midway/queue';",
"",
"/**",
" * 队列",
" */",
"@Queue()",
"@Scope(ScopeEnum.Singleton)",
"@Provide()",
"export abstract class xxxQueue implements ICoolQueue {",
"export abstract class xxxQueue extends BaseCoolQueue {",
" data(job: any, done: any): void {",
" console.log('收到的数据', job.data);",
" done();",

View File

@ -8,7 +8,7 @@
"@cool-midway/core": "^4.0.15",
"@cool-midway/es": "^4.0.2",
"@cool-midway/oss": "^4.0.3",
"@cool-midway/queue": "/Users/mac/Documents/src/cool/admin/midway-core/queue/dist/",
"@cool-midway/queue": "^4.1.0",
"@cool-midway/redis": "^4.0.5",
"@cool-midway/socket": "^4.0.2",
"@cool-midway/wxpay": "^4.0.3",
@ -37,7 +37,7 @@
"@midwayjs/mock": "^2.14.0",
"@types/jest": "^27.0.3",
"@types/jsonwebtoken": "^8.5.6",
"@types/node": "16",
"@types/node": "14",
"cross-env": "^7.0.3",
"jest": "^27.4.3",
"mwts": "^1.3.0",
@ -48,7 +48,7 @@
"node": ">=12.0.0"
},
"scripts": {
"start": "egg-scripts start --daemon --title=cool-admin-midway --framework=@midwayjs/web --port=8001 --sticky",
"start": "egg-scripts start --title=cool-admin-midway --framework=@midwayjs/web --port=8001",
"stop": "egg-scripts stop --title=cool-admin-midway",
"start_build": "npm run build && cross-env NODE_ENV=development midway-bin dev",
"docker": "egg-scripts start --title=cool-admin-midway --framework=@midwayjs/web --sticky",

View File

@ -1,8 +1,7 @@
import { Get, Inject, Post, Provide } from '@midwayjs/decorator';
import { Get, Inject, Provide } from '@midwayjs/decorator';
import { CoolController, BaseController, CoolUrlTag } from '@cool-midway/core';
import { DemoGoodsEntity } from '../../entity/goods';
import { DemoGoodsService } from '../../service/goods';
import { DemoQueue } from '../../queue/demo';
/**
*
@ -28,10 +27,6 @@ export class DemoAppGoodsController extends BaseController {
@Inject()
demoGoodsService: DemoGoodsService;
// 队列
@Inject()
demoQueue: DemoQueue;
/**
*
* @returns
@ -41,13 +36,4 @@ export class DemoAppGoodsController extends BaseController {
async all() {
return this.ok(await this.demoGoodsService.all());
}
/**
*
*/
@Post('/queue', { summary: '发送队列数据' })
async queue() {
console.log(111, this.demoQueue);
this.demoQueue.add({ a: 2 }, { delay: 2000 });
}
}

View File

@ -0,0 +1,39 @@
import { Get, Inject, Post, Provide } from '@midwayjs/decorator';
import { CoolController, BaseController } from '@cool-midway/core';
import { DemoCommQueue } from '../../queue/comm';
import { DemoGetterQueue } from '../../queue/getter';
/**
*
*/
@Provide()
@CoolController()
export class DemoQueueController extends BaseController {
// 普通队列
@Inject()
demoCommQueue: DemoCommQueue;
// 主动消费队列
@Inject()
demoGetterQueue: DemoGetterQueue;
/**
*
*/
@Post('/add', { summary: '发送队列数据' })
async queue() {
this.demoCommQueue.add({ a: 2 });
return this.ok();
}
/**
* getter时有效
*/
@Get('/getter')
async getter() {
const job = await this.demoCommQueue.getters.getJobs(['wait'], 0, 0, true);
// 获得完将数据从队列移除
await job[0].remove();
return this.ok(job[0].data);
}
}

View File

@ -3,18 +3,20 @@ import { IMidwayWebApplication } from '@midwayjs/web';
import { BaseCoolQueue, Queue } from '@cool-midway/queue';
/**
*
*
*/
@Queue()
@Scope(ScopeEnum.Singleton)
@Provide()
export class DemoQueue extends BaseCoolQueue {
export class DemoCommQueue extends BaseCoolQueue {
@App()
app: IMidwayWebApplication;
async data(job: any, done: any): Promise<void> {
// 这边可以执行定时任务具体的业务或队列的业务
console.log('数据', job.data);
// 抛出错误 可以让队列重试默认重试5次
//throw new Error('错误');
done();
}
}

View File

@ -0,0 +1,10 @@
import { Provide, Scope, ScopeEnum } from '@midwayjs/decorator';
import { BaseCoolQueue, Queue } from '@cool-midway/queue';
/**
*
*/
@Queue()
@Scope(ScopeEnum.Singleton)
@Provide()
export class DemoGetterQueue extends BaseCoolQueue {}