bull简介
队列 bull
bull用法
https://github.com/OptimalBits/bull
Bull is currently in maintenance mode, we are only fixing bugs. For new features check BullMQ, a modern rewritten implementation in Typescript. You are still very welcome to use Bull if it suits your needs, which is a safe, battle tested librarySourceURL:https://github.com/OptimalBits/bull?tab=readme-ov-file 公牛目前处于维护模式,我们只是修复错误。有关新特性,请检查 BullMQ,这是一个用 Typecript 重写的现代实现。如果符合您的需要,您仍然非常欢迎使用 Bull,这是一个安全的、经过战斗考验的库。
真正用法用bullMq参考bullMq
bull参考bull
不过概念理解知识内容,多参考bullMq,文档更加丰富
https://github.com/taskforcesh/bullmq
What is BullMQ | BullMQ
@Injectable()
export class QueueConfigurationService implements OnApplicationBootstrap {constructor() {}private queueRegistry = new Map<string, Queue>();async onApplicationBootstrap() {const redisQueueConfig = {host: process.env['REDIS_HOST'],port: Number(process.env['REDIS_PORT']),password: process.env['REDIS_PASSWORD'],connectionTimeout: 30000,db: 4,};try {const standardQueue = new Queue('StandardQueue', { redis: redisQueueConfig });this.queueRegistry.set('StandardQueue', standardQueue);} catch (error) {throw error;}}async getQueueByLabel(queueLabel: string): Promise<Queue | undefined> {return this.queueRegistry.get(queueLabel);}
}
// 在其他地方使用
this.taskQueue = new Queue(serviceQueueName, {redis: {host: process.env['REDIS_HOST'],port: Number(process.env['REDIS_PORT']),password: process.env['REDIS_PASSWORD'],connectTimeout: 30000,db: 4,},
});
this.taskQueue.process(taskSubscriptionName, async (job, done) => {});
Nestjs bull用法
Documentation | NestJS - A progressive Node.js framework
import { Injectable, Inject } from '@nestjs/common';
import { InjectRedis } from 'nestjs-redis';
import { InjectQueue, BullModule } from '@nestjs/bull';
import { BaseService } from './base.service';
import { TypeOrmModule } from '@nestjs/typeorm';// 常量定义
const NORMAL_QUEUE_NAME = 'normal_queue';
const PRIORITY_QUEUE_NAME = 'priority_queue';
const TEST_QUEUE_NAME = 'test_queue';// 实现 OnApplicationBootstrap 接口
@Injectable()
export class QueueConfigService {// 构造函数constructor(@InjectRedis('redisDistribute') private readonly redisDistribute: any,@InjectQueue(NORMAL_QUEUE_NAME) private readonly normalQueue: any,@InjectQueue(PRIORITY_QUEUE_NAME) private readonly priorityQueue: any,@InjectQueue(TEST_QUEUE_NAME) private readonly testQueue: any,private readonly base: BaseService) {}// 根据队列名称返回对应的队列实例getQueueByName(queueName: string): any {const queueMap = {[NORMAL_QUEUE_NAME]: this.normalQueue,[PRIORITY_QUEUE_NAME]: this.priorityQueue,[TEST_QUEUE_NAME]: this.testQueue,};return queueMap[queueName];}
}
// 模块定义
@Module({imports: [BullModule.registerQueueAsync([{name: NORMAL_QUEUE_NAME,useFactory: () => ({redis: {host: process.env['REDIS_HOST'],port: +process.env['REDIS_PORT'],password: process.env['REDIS_PASSWORD'],},}),},{name: PRIORITY_QUEUE_NAME,useFactory: () => ({redis: {host: process.env['REDIS_HOST'],port: +process.env['REDIS_PORT'],password: process.env['REDIS_PASSWORD'],},}),},{name: TEST_QUEUE_NAME,useFactory: () => ({redis: {host: process.env['REDIS_HOST'],port: +process.env['REDIS_PORT'],password: process.env['REDIS_PASSWORD'],},}),},]),BaseModule,],providers: [QueueConfigService],exports: [QueueConfigService],
})
export class QueueConfigModule {}
queueMap[TEST_QUEUE_NAME].add('test', 20);
@Processor('TEST_QUEUE_NAME')
export class QueneService {constructor(@InjectQueue('task') private readonly taskQueue: Queue) {}@Process('test')async processTask(job: Job<number>) {console.log('Processing', job);console.log('Processing done', job.id);}
}