mirror of
https://github.com/cool-team-official/cool-admin-midway.git
synced 2025-12-10 16:12:50 +00:00
383 lines
9.9 KiB
Plaintext
383 lines
9.9 KiB
Plaintext
---
|
||
description: 任务与队列(Task)
|
||
globs:
|
||
---
|
||
# 任务与队列(Task)
|
||
|
||
## 内置任务(代码中配置)
|
||
|
||
内置定时任务能力来自于[midwayjs](https://www.midwayjs.org/docs/extensions/cron)
|
||
|
||
### 引入组件
|
||
|
||
```ts
|
||
import { Configuration } from "@midwayjs/core";
|
||
import * as cron from "@midwayjs/cron"; // 导入模块
|
||
import { join } from "path";
|
||
|
||
@Configuration({
|
||
imports: [cron],
|
||
importConfigs: [join(__dirname, "config")],
|
||
})
|
||
export class AutoConfiguration {}
|
||
```
|
||
|
||
### 使用
|
||
|
||
```ts
|
||
import { Job, IJob } from "@midwayjs/cron";
|
||
import { FORMAT } from "@midwayjs/core";
|
||
|
||
@Job({
|
||
cronTime: FORMAT.CRONTAB.EVERY_PER_30_MINUTE,
|
||
start: true,
|
||
})
|
||
export class DataSyncCheckerJob implements IJob {
|
||
async onTick() {
|
||
// ...
|
||
}
|
||
}
|
||
```
|
||
|
||
```ts
|
||
@Job("syncJob", {
|
||
cronTime: "*/2 * * * * *", // 每隔 2s 执行
|
||
})
|
||
export class DataSyncCheckerJob implements IJob {
|
||
async onTick() {
|
||
// ...
|
||
}
|
||
}
|
||
```
|
||
|
||
### 规则 cron
|
||
|
||
```ts
|
||
* * * * * *
|
||
┬ ┬ ┬ ┬ ┬ ┬
|
||
│ │ │ │ │ |
|
||
│ │ │ │ │ └ day of week (0 - 7) (0 or 7 is Sun)
|
||
│ │ │ │ └───── month (1 - 12)
|
||
│ │ │ └────────── day of month (1 - 31)
|
||
│ │ └─────────────── hour (0 - 23)
|
||
│ └──────────────────── minute (0 - 59)
|
||
└───────────────────────── second (0 - 59, optional)
|
||
|
||
```
|
||
|
||
::: warning 警告
|
||
|
||
注意:该方式在多实例部署的情况下无法做到任务之前的协同,任务存在重复执行的可能
|
||
|
||
:::
|
||
|
||
## 本地任务(管理后台配置,v8.0 新增)
|
||
|
||
可以到登录后台`/系统管理/任务管理/任务列表`,配置任务。默认是不需要任何依赖的, 旧版需要依赖`redis`才能使用该功能。
|
||
|
||
### 配置任务
|
||
|
||
配置完任务可以调用你配置的 service 方法,如:taskDemoService.test()
|
||
|
||
### 规则 cron
|
||
|
||
规则 cron
|
||
|
||
```ts
|
||
* * * * * *
|
||
┬ ┬ ┬ ┬ ┬ ┬
|
||
│ │ │ │ │ |
|
||
│ │ │ │ │ └ day of week (0 - 7) (0 or 7 is Sun)
|
||
│ │ │ │ └───── month (1 - 12)
|
||
│ │ │ └────────── day of month (1 - 31)
|
||
│ │ └─────────────── hour (0 - 23)
|
||
│ └──────────────────── minute (0 - 59)
|
||
└───────────────────────── second (0 - 59, optional)
|
||
|
||
```
|
||
|
||
规则示例:
|
||
|
||
- 每 5 秒执行一次: `*/5 * * * * *`
|
||
- 每 5 分钟执行一次: `*/5 * * * *`
|
||
- 每小时执行一次: `0 * * * *`
|
||
- 每天执行一次: `0 0 * * *`
|
||
- 每天 1 点执行: `0 1 * * *`
|
||
- 每周执行一次: `0 0 * * 0`
|
||
- 每月执行一次: `0 0 1 * *`
|
||
|
||

|
||
|
||
## 分布式任务(管理后台配置)
|
||
|
||
当需要分布式部署时,需要开启分布式任务,通过 redis 作为协同整个集群的任务,防止任务重复执行等异常情况。
|
||
|
||
#### 引入插件
|
||
|
||
`src/configuration.ts`
|
||
|
||
```ts
|
||
import { Configuration, App } from "@midwayjs/core";
|
||
import { join } from "path";
|
||
import * as task from "@cool-midway/task";
|
||
|
||
@Configuration({
|
||
imports: [task],
|
||
importConfigs: [join(__dirname, "./config")],
|
||
})
|
||
export class ContainerLifeCycle {
|
||
@App()
|
||
app: koa.Application;
|
||
|
||
async onReady() {}
|
||
}
|
||
```
|
||
|
||
#### 配置
|
||
|
||
[redis>=5.x](https://redis.io/),推荐[redis>=7.x](https://redis.io/)
|
||
|
||
`src/config/config.default.ts`
|
||
|
||
::: warning 注意
|
||
很多人忽略了这个配置,导致项目包 redis 连接错误!!!
|
||
:::
|
||
|
||
```ts
|
||
import { CoolFileConfig, MODETYPE } from "@cool-midway/file";
|
||
import { MidwayConfig } from "@midwayjs/core";
|
||
import * as fsStore from "cache-manager-fs-hash";
|
||
|
||
export default {
|
||
// 修改成你自己独有的key
|
||
keys: "cool-admin for node",
|
||
koa: {
|
||
port: 8001,
|
||
},
|
||
// cool配置
|
||
cool: {
|
||
redis: {
|
||
host: "127.0.0.1",
|
||
port: 6379,
|
||
password: "",
|
||
db: 0,
|
||
},
|
||
},
|
||
} as unknown as MidwayConfig;
|
||
```
|
||
|
||
redis cluster 方式
|
||
|
||
```ts
|
||
[
|
||
{
|
||
host: "192.168.0.103",
|
||
port: 7000,
|
||
},
|
||
{
|
||
host: "192.168.0.103",
|
||
port: 7001,
|
||
},
|
||
{
|
||
host: "192.168.0.103",
|
||
port: 7002,
|
||
},
|
||
{
|
||
host: "192.168.0.103",
|
||
port: 7003,
|
||
},
|
||
{
|
||
host: "192.168.0.103",
|
||
port: 7004,
|
||
},
|
||
{
|
||
host: "192.168.0.103",
|
||
port: 7005,
|
||
},
|
||
];
|
||
```
|
||
|
||
### 创建执行任务的 service
|
||
|
||
```ts
|
||
import { Provide } from "@midwayjs/core";
|
||
import { BaseService } from "@cool-midway/core";
|
||
/**
|
||
* 任务执行的demo示例
|
||
*/
|
||
@Provide()
|
||
export class DemoTaskService extends BaseService {
|
||
/**
|
||
* 测试任务执行
|
||
* @param params 接收的参数 数组 [] 可不传
|
||
*/
|
||
async test(params?: []) {
|
||
// 需要登录后台任务管理配置任务
|
||
console.log("任务执行了", params);
|
||
}
|
||
}
|
||
```
|
||
|
||
### 配置定时任务
|
||
|
||
登录后台 任务管理/任务列表
|
||
|
||

|
||
|
||
::: warning
|
||
截图中的 demoTaskService 为上一步执行任务的 service 的实例 ID,midwayjs 默认为类名首字母小写!!!
|
||
|
||
任务调度基于 redis,所有的任务都需要通过代码去维护任务的创建,启动,暂停。 所以直接改变数据库的任务状态是无效的,redis 中的信息还未清空, 任务将继续执行。
|
||
:::
|
||
|
||
## 队列
|
||
|
||
之前的分布式任务调度,其实是利用了[bullmq](https://docs.bullmq.io/)的重复队列机制。
|
||
|
||
在项目开发过程中特别是较大型、数据量较大、业务较复杂的场景下往往需要用到队列。 如:抢购、批量发送消息、分布式事务、订单 2 小时后失效等。
|
||
|
||
得益于[bullmq](https://docs.bullmq.io/),cool 的队列也支持`延迟`、`重复`、`优先级`等高级特性。
|
||
|
||
### 创建队列
|
||
|
||
一般放在名称为 queue 文件夹下
|
||
|
||
#### 普通队列
|
||
|
||
普通队列数据由消费者自动消费,必须重写 data 方法用于被动消费数据。
|
||
|
||
`src/modules/demo/queue/comm.ts`
|
||
|
||
```ts
|
||
import { BaseCoolQueue, CoolQueue } from "@cool-midway/task";
|
||
import { IMidwayApplication } from "@midwayjs/core";
|
||
import { App } from "@midwayjs/core";
|
||
|
||
/**
|
||
* 普通队列
|
||
*/
|
||
@CoolQueue()
|
||
export class DemoCommQueue extends BaseCoolQueue {
|
||
@App()
|
||
app: IMidwayApplication;
|
||
|
||
async data(job: any, done: any): Promise<void> {
|
||
// 这边可以执行定时任务具体的业务或队列的业务
|
||
console.log("数据", job.data);
|
||
// 抛出错误 可以让队列重试,默认重试5次
|
||
//throw new Error('错误');
|
||
done();
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 主动队列
|
||
|
||
主动队列数据由消费者主动消费
|
||
|
||
`src/modules/demo/queue/getter.ts`
|
||
|
||
```ts
|
||
import { BaseCoolQueue, CoolQueue } from "@cool-midway/task";
|
||
|
||
/**
|
||
* 主动消费队列
|
||
*/
|
||
@CoolQueue({ type: "getter" })
|
||
export class DemoGetterQueue extends BaseCoolQueue {}
|
||
```
|
||
|
||
主动消费数据
|
||
|
||
```ts
|
||
// 主动消费队列
|
||
@Inject()
|
||
demoGetterQueue: DemoGetterQueue;
|
||
|
||
const job = await this.demoGetterQueue.getters.getJobs(['wait'], 0, 0, true);
|
||
// 获得完将数据从队列移除
|
||
await job[0].remove();
|
||
```
|
||
|
||
### 发送数据
|
||
|
||
```ts
|
||
import { Get, Inject, Post, Provide } from "@midwayjs/core";
|
||
import { CoolController, BaseController } from "@cool-midway/core";
|
||
import { DemoCommQueue } from "../../queue/comm";
|
||
import { DemoGetterQueue } from "../../queue/getter";
|
||
|
||
/**
|
||
* 队列
|
||
*/
|
||
@Provide()
|
||
@CoolController()
|
||
export class DemoQueueController extends BaseController {
|
||
// 普通队列
|
||
@Inject()
|
||
demoCommQueue: DemoCommQueue;
|
||
|
||
// 主动消费队列
|
||
@Inject()
|
||
demoGetterQueue: DemoGetterQueue;
|
||
|
||
/**
|
||
* 发送数据到队列
|
||
*/
|
||
@Post("/add", { summary: "发送队列数据" })
|
||
async queue() {
|
||
this.demoCommQueue.add({ a: 2 });
|
||
return this.ok();
|
||
}
|
||
|
||
/**
|
||
* 获得队列中的数据,只有当队列类型为getter时有效
|
||
*/
|
||
@Get("/getter")
|
||
async getter() {
|
||
const job = await this.demoCommQueue.getters.getJobs(["wait"], 0, 0, true);
|
||
// 获得完将数据从队列移除
|
||
await job[0].remove();
|
||
return this.ok(job[0].data);
|
||
}
|
||
}
|
||
```
|
||
|
||
队列配置
|
||
|
||
```ts
|
||
interface JobOpts {
|
||
priority: number; // Optional priority value. ranges from 1 (highest priority) to MAX_INT (lowest priority). Note that
|
||
// using priorities has a slight impact on performance, so do not use it if not required.
|
||
|
||
delay: number; // An amount of milliseconds to wait until this job can be processed. Note that for accurate delays, both
|
||
// server and clients should have their clocks synchronized. [optional].
|
||
|
||
attempts: number; // The total number of attempts to try the job until it completes.
|
||
|
||
repeat: RepeatOpts; // Repeat job according to a cron specification.
|
||
|
||
backoff: number | BackoffOpts; // Backoff setting for automatic retries if the job fails, default strategy: `fixed`
|
||
|
||
lifo: boolean; // if true, adds the job to the right of the queue instead of the left (default false)
|
||
timeout: number; // The number of milliseconds after which the job should be fail with a timeout error [optional]
|
||
|
||
jobId: number | string; // Override the job ID - by default, the job ID is a unique
|
||
// integer, but you can use this setting to override it.
|
||
// If you use this option, it is up to you to ensure the
|
||
// jobId is unique. If you attempt to add a job with an id that
|
||
// already exists, it will not be added.
|
||
|
||
removeOnComplete: boolean | number; // If true, removes the job when it successfully
|
||
// completes. A number specified the amount of jobs to keep. Default behavior is to keep the job in the completed set.
|
||
|
||
removeOnFail: boolean | number; // If true, removes the job when it fails after all attempts. A number specified the amount of jobs to keep
|
||
// Default behavior is to keep the job in the failed set.
|
||
stackTraceLimit: number; // Limits the amount of stack trace lines that will be recorded in the stacktrace.
|
||
}
|
||
```
|
||
|
||
::: tip
|
||
this.demoQueue.queue 获得的就是 bull 实例,更多 bull 的高级用户可以查看[bull 文档](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md)
|
||
:::
|