@nestjs/bull
和 bull
包npm install --save @nestjs/bull bull
AppModule
中导入BullModule
注意, 这个注册方法和官网上有点不一样. 这里直接用到 forRoot
与 官网的 registerQueue
不一样, 这个 registerQueue
我放到模块里面注册列表去了, 这里只做 redis 数据库的连接
BullModule.forRoot({
redis: {
host: '127.0.0.1',
port: 6379,
password: '123456', // 如果 redis 有密码就写, 没有就不用.
db: 0 // 连接的那个库
}
})
QueueModule
做队列的事, 做统一管理
ts复制代码
import {Global,Module }from '@nestjs/common';
import {BullModule }from '@nestjs/bull';
import {QueueService }from './queue.service';
import {FlowProcessor }from './processor/flow.processor';
import {QueueName }from '@/utils/enum';
import {QueueController }from './queue.controller';
import {CreateProcessor }from './processor/create.processor';
@Global()
@Module({
imports: [
BullModule.registerQueue({ name:QueueName.create }, { name:QueueName.flow }),
controllers: [QueueController],
providers: [CreateProcessor,FlowProcessor,QueueService],
exports: [QueueService]
})
exportclassQueueModule { }
在 QueueModule
注册队名就行了, 数据库连接不用管.
BullModule.registerQueue({ name: QueueName.create }, { name: QueueName.flow })
name 是个 string, 我为了统一处理, 用了 emum 格式.
providers 里, 在加入消费者, 也就是干活那个类 CreateProcessor, 一个队列指定一个消费者
// 这个 QueueName.create 就是邦定列表
@Processor(QueueName.create)
export class CreateProcessor {
constructor(
private readonly messageService: MessageService,
private readonly emailService: EmailService,
private readonly wsService: WsService,
private readonly trackingService: TrackingService,
) { }
// 这里可以做指定那个方法做事.
@Process()
async create(job: Job<any>) {
// 做事在这里.
}
注入到 Service 中, 然后调用就行了
@InjectQueue(QueueName.flow) private flowQueue: Queue
这个类中, 可以添加任务到队列, 查看队列状态, 重试等等.
import { ProcessorName, QueueName } from '@/utils/enum';
import { InjectQueue } from '@nestjs/bull';
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Queue } from 'bull';
@Injectable()
export class QueueService implements OnModuleInit {
constructor(
@InjectQueue(QueueName.flow) private flowQueue: Queue,
@InjectQueue(QueueName.create) private createQueue: Queue,
) { }
/**
* 创建事务队列
* @param data 处理的数据
* @returns jobId
*/
async createForQueue(data: any): Promise<any> {
try {
const job = await this.createQueue.add(data, {
attempts: 3, // 允许最大重试次数
backoff: 2000, // 重试之间的延迟时间(毫秒)
removeOnComplete: true, // 任务成功后自动从队列中移除
});
return job;
} catch (error) {
Logger.error('createForQueue', error)
return null
}
}
/**
* 处理事务队列
* @param data 处理的数据
* @returns jobId
*/
async flowForQueue(data: any): Promise<any> {
try {
const job = await this.flowQueue.add(data, {
attempts: 3, // 允许最大重试次数
backoff: 2000, // 重试之间的延迟时间(毫秒)
removeOnComplete: true, // 任务成功后自动从队列中移除
});
return job;
} catch (error) {
Logger.error('flowForQueue', error)
return null
}
}
/**
* Job 状态
* @param jobId 队列id
* @param queueName 队名
* @returns
*/
async getJobStatus(jobId: string, queueName: QueueName): Promise<string> {
let job
switch (queueName) {
case QueueName.create:
job = await this.createQueue.getJob(jobId)
break;
default:
job = await this.flowQueue.getJob(jobId);
break;
}
if (!job) {
return 'Job not found';
}
const status = await job.getState();
return status;
}
/**
* 重新提交到队列
* @param jobId 队列id
* @param queueName 队名
*/
async retryJob(jobId: string, queueName: QueueName): Promise<void> {
let job
switch (queueName) {
case QueueName.create:
job = await this.createQueue.getJob(jobId)
break;
default:
job = await this.flowQueue.getJob(jobId);
break;
}
if (job) {
await job.retry();
}
}
}