您的位置:首页 > 财经 > 产业 > NestJs bull 用法

NestJs bull 用法

2024/11/16 13:21:42 来源:https://blog.csdn.net/qq_39517116/article/details/141730290  浏览:    关键词:NestJs bull 用法

bull简介

队列 bull

bull用法

https://github.com/OptimalBits/bull

Bull is currently in maintenance mode, we are only fixing bugs. For new features check BullMQ, a modern rewritten implementation in Typescript. You are still very welcome to use Bull if it suits your needs, which is a safe, battle tested librarySourceURL:https://github.com/OptimalBits/bull?tab=readme-ov-file 公牛目前处于维护模式,我们只是修复错误。有关新特性,请检查 BullMQ,这是一个用 Typecript 重写的现代实现。如果符合您的需要,您仍然非常欢迎使用 Bull,这是一个安全的、经过战斗考验的库。

真正用法用bullMq参考bullMq

bull参考bull

不过概念理解知识内容,多参考bullMq,文档更加丰富

https://github.com/taskforcesh/bullmq

What is BullMQ | BullMQ

@Injectable()
export class QueueConfigurationService implements OnApplicationBootstrap {constructor() {}private queueRegistry = new Map<string, Queue>();async onApplicationBootstrap() {const redisQueueConfig = {host: process.env['REDIS_HOST'],port: Number(process.env['REDIS_PORT']),password: process.env['REDIS_PASSWORD'],connectionTimeout: 30000,db: 4,};try {const standardQueue = new Queue('StandardQueue', { redis: redisQueueConfig });this.queueRegistry.set('StandardQueue', standardQueue);} catch (error) {throw error;}}async getQueueByLabel(queueLabel: string): Promise<Queue | undefined> {return this.queueRegistry.get(queueLabel);}
}

// 在其他地方使用
this.taskQueue = new Queue(serviceQueueName, {redis: {host: process.env['REDIS_HOST'],port: Number(process.env['REDIS_PORT']),password: process.env['REDIS_PASSWORD'],connectTimeout: 30000,db: 4,},
});
this.taskQueue.process(taskSubscriptionName, async (job, done) => {});

Nestjs bull用法

Documentation | NestJS - A progressive Node.js framework

import { Injectable, Inject } from '@nestjs/common';
import { InjectRedis } from 'nestjs-redis';
import { InjectQueue, BullModule } from '@nestjs/bull';
import { BaseService } from './base.service';
import { TypeOrmModule } from '@nestjs/typeorm';// 常量定义
const NORMAL_QUEUE_NAME = 'normal_queue';
const PRIORITY_QUEUE_NAME = 'priority_queue';
const TEST_QUEUE_NAME = 'test_queue';// 实现 OnApplicationBootstrap 接口
@Injectable()
export class QueueConfigService {// 构造函数constructor(@InjectRedis('redisDistribute') private readonly redisDistribute: any,@InjectQueue(NORMAL_QUEUE_NAME) private readonly normalQueue: any,@InjectQueue(PRIORITY_QUEUE_NAME) private readonly priorityQueue: any,@InjectQueue(TEST_QUEUE_NAME) private readonly testQueue: any,private readonly base: BaseService) {}// 根据队列名称返回对应的队列实例getQueueByName(queueName: string): any {const queueMap = {[NORMAL_QUEUE_NAME]: this.normalQueue,[PRIORITY_QUEUE_NAME]: this.priorityQueue,[TEST_QUEUE_NAME]: this.testQueue,};return queueMap[queueName];}
}

// 模块定义
@Module({imports: [BullModule.registerQueueAsync([{name: NORMAL_QUEUE_NAME,useFactory: () => ({redis: {host: process.env['REDIS_HOST'],port: +process.env['REDIS_PORT'],password: process.env['REDIS_PASSWORD'],},}),},{name: PRIORITY_QUEUE_NAME,useFactory: () => ({redis: {host: process.env['REDIS_HOST'],port: +process.env['REDIS_PORT'],password: process.env['REDIS_PASSWORD'],},}),},{name: TEST_QUEUE_NAME,useFactory: () => ({redis: {host: process.env['REDIS_HOST'],port: +process.env['REDIS_PORT'],password: process.env['REDIS_PASSWORD'],},}),},]),BaseModule,],providers: [QueueConfigService],exports: [QueueConfigService],
})
export class QueueConfigModule {}

queueMap[TEST_QUEUE_NAME].add('test', 20);

@Processor('TEST_QUEUE_NAME')
export class QueneService {constructor(@InjectQueue('task') private readonly taskQueue: Queue) {}@Process('test')async processTask(job: Job<number>) {console.log('Processing', job);console.log('Processing done', job.id);}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com