1. bull 用的是 Redis 的队列服务, 所以要安装 redis 数据库, 安装方法看官网.

2. 安装 @nestjs/bull 和 bull

npm install --save @nestjs/bull bull

3. 根AppModule中导入BullModule

注意, 这个注册方法和官网上有点不一样. 这里直接用到 forRoot 与 官网的 registerQueue 不一样, 这个 registerQueue 我放到模块里面注册列表去了, 这里只做 redis 数据库的连接

BullModule.forRoot({
  redis: {
    host: '127.0.0.1',
    port: 6379,
    password: '123456',  // 如果 redis 有密码就写, 没有就不用.
    db: 0                // 连接的那个库
  }
})

4. 可以添加一个 QueueModule 做队列的事, 做统一管理


ts复制代码
import {Global,Module }from '@nestjs/common';
import {BullModule }from '@nestjs/bull';
import {QueueService }from './queue.service';
import {FlowProcessor }from './processor/flow.processor';
import {QueueName }from '@/utils/enum';
import {QueueController }from './queue.controller';
import {CreateProcessor }from './processor/create.processor';
@Global()
@Module({
  imports: [
BullModule.registerQueue({ name:QueueName.create }, { name:QueueName.flow }),
  controllers: [QueueController],
  providers: [CreateProcessor,FlowProcessor,QueueService],
  exports: [QueueService]
})
exportclassQueueModule { }

在 QueueModule 注册队名就行了, 数据库连接不用管.

BullModule.registerQueue({ name: QueueName.create }, { name: QueueName.flow })

name 是个 string, 我为了统一处理, 用了 emum 格式.

5. 消费者

providers 里, 在加入消费者, 也就是干活那个类 CreateProcessor, 一个队列指定一个消费者

// 这个 QueueName.create 就是邦定列表
@Processor(QueueName.create)  
export class CreateProcessor {
  constructor(
    private readonly messageService: MessageService,
    private readonly emailService: EmailService,
    private readonly wsService: WsService,
    private readonly trackingService: TrackingService,
  ) { }
  // 这里可以做指定那个方法做事.
  @Process() 
  async create(job: Job<any>) { 
      // 做事在这里.
  }

6. 生产者 QueueService

注入到 Service 中, 然后调用就行了

@InjectQueue(QueueName.flow) private flowQueue: Queue

这个类中, 可以添加任务到队列, 查看队列状态, 重试等等.

import { ProcessorName, QueueName } from '@/utils/enum';
import { InjectQueue } from '@nestjs/bull';
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Queue } from 'bull';

@Injectable()
export class QueueService implements OnModuleInit {
    constructor(
        @InjectQueue(QueueName.flow) private flowQueue: Queue,
        @InjectQueue(QueueName.create) private createQueue: Queue,
    ) { }
       /**
     * 创建事务队列
     * @param data 处理的数据
     * @returns jobId
     */
    async createForQueue(data: any): Promise<any> {
        try {
            const job = await this.createQueue.add(data, {
                attempts: 3,          // 允许最大重试次数
                backoff: 2000,        // 重试之间的延迟时间(毫秒)
                removeOnComplete: true, // 任务成功后自动从队列中移除
            });   
            return job;
        } catch (error) {
            Logger.error('createForQueue', error)
            return null
        }
    }

    /**
     * 处理事务队列
     * @param data 处理的数据
     * @returns jobId
     */
    async flowForQueue(data: any): Promise<any> {
        try {
            const job = await this.flowQueue.add(data, {
                attempts: 3,          // 允许最大重试次数
                backoff: 2000,        // 重试之间的延迟时间(毫秒)
                removeOnComplete: true, // 任务成功后自动从队列中移除
            });
            return job;
        } catch (error) {
            Logger.error('flowForQueue', error)
            return null
        }
    }

    /**
     * Job 状态
     * @param jobId 队列id
     * @param queueName 队名
     * @returns 
     */
    async getJobStatus(jobId: string, queueName: QueueName): Promise<string> {
        let job
        switch (queueName) {
            case QueueName.create:
                job = await this.createQueue.getJob(jobId)
                break;

            default:
                job = await this.flowQueue.getJob(jobId);
                break;
        }

        if (!job) {
            return 'Job not found';
        }
        const status = await job.getState();
        return status;
    }

    /**
     * 重新提交到队列
     * @param jobId 队列id
     * @param queueName 队名
     */
    async retryJob(jobId: string, queueName: QueueName): Promise<void> {
        let job
        switch (queueName) {
            case QueueName.create:
                job = await this.createQueue.getJob(jobId)
                break;

            default:
                job = await this.flowQueue.getJob(jobId);
                break;
        }
        if (job) {
            await job.retry();
        }
    }
}