- 平滑处理峰值。例如,如果用户可以在任意时间启动资源密集型任务,您可以将这些任务添加到队列中,而不是同步执行它们。然后,您可以让工作进程以受控的方式从队列中提取任务。您可以轻松添加新的队列使用者,以在应用程序扩展时扩展后端任务处理。
- 分解可能阻塞 Node.js 事件循环的单片任务。例如,如果用户请求需要 CPU 密集型工作(如音频转码),您可以将此任务委托给其他进程,从而释放面向用户的进程以保持响应。
- 提供跨各种服务的可靠通信渠道。例如,您可以在一个进程或服务中排队任务(作业),并在另一个进程或服务中使用它们。您可以在任何进程或服务的作业生命周期中完成、出错或其他状态更改时收到通知(通过监听状态事件)。当队列生产者或消费者发生故障时,它们的状态会被保留,任务处理可以在节点重新启动时自动重新启动。
Nest 为 BullMQ 集成提供了 @nestjs/bullmq
包,为 Bull 集成提供了 @nestjs/bull
包。这两个包都是基于各自库的抽象/包装器,由同一团队开发。Bull 目前处于维护模式,团队专注于修复错误,而 BullMQ 正在积极开发,具有现代 TypeScript 实现和一组不同的功能。如果 Bull 满足您的要求,它仍然是一个可靠且久经考验的选择。Nest 包可以轻松地以友好的方式将 BullMQ 或 Bull 队列集成到您的 Nest 应用程序中。
BullMQ 和 Bull 都使用 Redis 来保存作业数据,因此您需要在系统上安装 Redis。由于它们由 Redis 支持,因此您的队列架构可以完全分布式且独立于平台。例如,您可以在 Nest 的一个(或多个)节点上运行一些队列 生产者 和 消费者 和 监听器,并在其他网络节点上的其他 Node.js 平台上运行其他生产者、消费者和监听器。
本章介绍 @nestjs/bullmq
和 @nestjs/bull
包。我们还建议阅读 BullMQ 和 Bull 文档,了解更多背景和具体实施细节。
BullMQ 安装
要开始使用 BullMQ,我们首先安装所需的依赖项。
$ npm install --save @nestjs/bullmq bullmq
安装过程完成后,我们可以将 BullModule
导入到根目录 AppModule
中。
import { Module } from '@nestjs/common'
import { BullModule } from '@nestjs/bullmq'
@Module({
imports: [
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}
forRoot()
方法用于注册一个 bullmq
包配置对象,该对象将由应用程序中注册的所有队列使用(除非另有说明)。供您参考,以下是配置对象中的一些属性:
connection: ConnectionOptions
- 用于配置 Redis 连接的选项。有关更多信息,请参阅 Connections。可选。prefix: string
- 所有队列键的前缀。可选。defaultJobOptions: JobOpts
- 用于控制新作业的默认设置的选项。有关更多信息,请参阅 JobOpts。可选。settings: AdvancedSettings
- 高级队列配置设置。这些通常不应更改。有关详细信息,请参阅 AdvancedSettings。可选。
所有选项都是可选的,提供对队列行为的详细控制。这些选项直接传递给 BullMQ Queue
构造函数。此处 了解有关这些选项和其他选项的更多信息。
要注册队列,请导入 BullModule.registerQueue()
动态模块,如下所示:
BullModule.registerQueue({
name: 'audio',
})
通过将多个逗号分隔的配置对象传递给 registerQueue()
方法,创建多个队列。
registerQueue()
方法用于实例化和/或注册队列。队列在连接到具有相同凭据的同一底层 Redis 数据库的模块和进程之间共享。每个队列的名称属性都是唯一的。队列名称既用作注入令牌(用于将队列注入控制器/提供程序),又用作装饰器的参数,以将消费者类和侦听器与队列关联起来。
您还可以覆盖特定队列的一些预配置选项,如下所示:
BullModule.registerQueue({
name: 'audio',
connection: {
port: 6380,
},
})
BullMQ 还支持作业之间的父子关系。此功能允许创建流,其中作业是任意深度树的节点。要了解有关它们的更多信息,请查看此处。
要添加流,您可以执行以下操作:
BullModule.registerFlowProducer({
name: 'flowProducerName',
})
由于作业持久保存在 Redis 中,因此每次实例化特定命名的队列时(例如,当启动/重新启动应用程序时),它都会尝试处理可能存在于上一个未完成会话中的任何旧作业。
每个队列可以有一个或多个生产者、消费者和侦听器。消费者按特定顺序从队列中检索作业:FIFO(默认)、LIFO 或根据优先级。控制队列处理顺序在此处 中讨论。
命名配置
如果您的队列连接到多个不同的 Redis 实例,则可以使用一种称为命名配置的技术。此功能允许您在指定的键下注册多个配置,然后您可以在队列选项中引用这些配置。
例如,假设您有一个额外的 Redis 实例(除默认实例外)供应用程序中注册的几个队列使用,您可以按如下方式注册其配置:
BullModule.forRoot('alternative-config', {
connection: {
port: 6381,
},
})
在上面的例子中,alternative-config
只是一个配置键(可以是任意字符串)。
有了这个,您现在可以在registerQueue()
选项对象中指向此配置:
BullModule.registerQueue({
configKey: 'alternative-config',
name: 'video',
})
生产者
作业生产者将作业添加到队列。生产者通常是应用服务(Nest providers)。要将作业添加到队列,首先将队列注入服务,如下所示:
import { Injectable } from '@nestjs/common'
import { Queue } from 'bullmq'
import { InjectQueue } from '@nestjs/bullmq'
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}
@InjectQueue()
装饰器通过其名称来标识队列,如 registerQueue()
方法调用中提供的名称(例如 'audio'
)。
现在,通过调用队列的 add()
方法添加作业,并传递用户定义的作业对象。作业表示为可序列化的 JavaScript 对象(因为它们就是这样存储在 Redis 数据库中的)。您传递的作业的形状是任意的;使用它来表示作业对象的语义。您还需要给它一个名字。这允许您创建专门的 consumers,它将只处理具有给定名称的作业。
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
})
作业选项
作业可以有与之关联的其他选项。在 Queue.add()
方法中的 job
参数后传递一个选项对象。一些作业选项属性包括:
priority
:number
- 可选优先级值。范围从 1(最高优先级)到 MAX_INT(最低优先级)。请注意,使用优先级对性能有轻微影响,因此请谨慎使用。delay
:number
- 等待此作业处理的时间量(毫秒)。请注意,为了获得准确的延迟,服务器和客户端都应同步其时钟。attempts
:number
- 尝试作业直至完成的总尝试次数。repeat
:RepeatOpts
- 根据 cron 规范重复作业。请参阅 RepeatOpts。backoff
:number | BackoffOpts
- 如果作业失败,则自动重试的退避设置。请参阅 BackoffOpts。lifo
:boolean
- 如果为 true,则将作业添加到队列的右端而不是左端(默认为 false)。jobId
:number
|string
- 覆盖作业 ID - 默认情况下,作业 ID 是一个唯一的整数,但您可以使用此设置来覆盖它。如果您使用此选项,则由您来确保 jobId 是唯一的。如果您尝试添加具有已存在 ID 的作业,则不会添加该作业。removeOnComplete
:boolean | number
- 如果为 true,则在作业成功完成后删除作业。数字指定要保留的作业数量。默认行为是将作业保留在已完成的集合中。removeOnFail
:boolean | number
- 如果为 true,则在所有尝试后失败时删除该作业。数字指定要保留的作业数量。默认行为是将作业保留在失败的集合中。stackTraceLimit
:number
- 限制将记录在堆栈跟踪中的堆栈跟踪行数。
以下是使用作业选项自定义作业的几个示例。
要延迟作业的启动,请使用 delay
配置属性。
const job = await this.audioQueue.add(
'transcode',
{
foo: 'bar',
},
{ delay: 3000 }, // 3 seconds delayed
)
要将作业添加到队列的右端(将作业处理为LIFO(后进先出)),请将配置对象的lifo
属性设置为true
。
const job = await this.audioQueue.add(
'transcode',
{
foo: 'bar',
},
{ lifo: true },
)
要对作业进行优先排序,请使用priority
属性。
const job = await this.audioQueue.add(
'transcode',
{
foo: 'bar',
},
{ priority: 2 },
)
有关选项的完整列表,请查看 此处 和 此处 的 API 文档。
消费者
消费者是一个类,定义方法,可以处理添加到队列中的作业,或监听队列上的事件,或两者兼而有之。使用 @Processor()
装饰器声明消费者类,如下所示:
import { Processor } from '@nestjs/bullmq'
@Processor('audio')
export class AudioConsumer {}
消费者必须注册为提供者
,以便@nestjs/bullmq
包可以接收它们。
其中装饰器的字符串参数(例如audio
)是与类方法关联的队列的名称。
import { Processor, WorkerHost } from '@nestjs/bullmq'
import { Job } from 'bullmq'
@Processor('audio')
export class AudioConsumer extends WorkerHost {
async process(job: Job<any, any, string>): Promise<any> {
let progress = 0
for (i = 0; i < 100; i++) {
await doSomething(job.data)
progress += 1
await job.progress(progress)
}
return {}
}
}
每当工作器处于空闲状态并且队列中有要处理的作业时,就会调用 process 方法。此处理程序方法接收 job
对象作为其唯一参数。处理程序方法返回的值存储在作业对象中,稍后可以访问,例如在完成事件的侦听器中。
Job
对象有多种方法允许您与其状态进行交互。例如,上面的代码使用 progress()
方法更新作业的进度。请参阅 此处 以获取完整的 Job
对象 API 参考。
在旧版本 Bull 中,您可以指定作业处理程序方法将 仅 处理某种类型的作业(具有特定 name
的作业),方法是将该 name
传递给 @Process()
装饰器,如下所示。
这不适用于 BullMQ,请继续阅读。
@Process('transcode')
async transcode(job: Job<unknown>) { ... }
由于此行为会产生混淆,因此 BullMQ 不支持此行为。相反,您需要切换用例来为每个作业名称调用不同的服务或逻辑:
import { Processor, WorkerHost } from '@nestjs/bullmq'
import { Job } from 'bullmq'
@Processor('audio')
export class AudioConsumer extends WorkerHost {
async process(job: Job<any, any, string>): Promise<any> {
switch (job.name) {
case 'transcode': {
let progress = 0
for (i = 0; i < 100; i++) {
await doSomething(job.data)
progress += 1
await job.progress(progress)
}
return {}
}
case 'concatenate': {
await doSomeLogic2()
break
}
}
}
}
BullMQ 文档的 命名处理器 部分介绍了此内容。
请求范围的消费者
当消费者被标记为请求范围时(详细了解注入范围 此处),将为每个作业专门创建一个类的新实例。作业完成后,该实例将被垃圾回收。
@Processor({
name: 'audio',
scope: Scope.REQUEST,
})
由于请求范围的消费者类是动态实例化的,并且范围限定为单个作业,因此您可以使用标准方法通过构造函数注入JOB_REF
。
constructor(@Inject(JOB_REF) jobRef: Job) {
console.log(jobRef);
}
JOB_REF
标记是从 @nestjs/bullmq
包导入的。
事件监听器
当队列和/或作业状态发生变化时,Bull 会生成一组有用的事件。Nest 提供了 @OnQueueEvent(event)
装饰器,允许订阅一组核心标准事件。
事件监听器必须在 consumer 类中声明(即,在用 @Processor()
装饰器修饰的类中)。要监听事件,请将 @OnQueueEvent(event)
装饰器与要处理的事件一起使用。例如,要监听作业在 audio
队列中进入活动状态时发出的事件,请使用以下构造:
import { Processor, Process, OnQueueEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('audio')
export class AudioConsumer {
@OnQueueEvent('active')
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
// ...
您可以在 此处 中看到作为 QueueEventsListener 属性的完整事件列表。
队列管理
队列具有一个 API,允许您执行管理功能,如暂停和恢复、检索各种状态下的作业数量等。您可以在 此处 找到完整的队列 API。直接在 Queue
对象上调用这些方法中的任何一个,如下面暂停/恢复示例所示。
使用 pause()
方法调用暂停队列。暂停的队列在恢复之前不会处理新作业,但当前正在处理的作业将继续,直到它们完成。
await audioQueue.pause()
要恢复暂停的队列,请使用 resume()
方法,如下所示:
await audioQueue.resume()
独立进程
作业处理程序也可以在独立(分叉)进程中运行(来源)。这有几个优点:
- 该进程是沙盒化的,因此如果它崩溃,也不会影响工作器。
- 您可以运行阻塞代码而不影响队列(作业不会停滞)。
- 多核 CPU 的利用率更高。
- 与 redis 的连接更少。
import { join } from 'node:path'
import { Module } from '@nestjs/common'
import { BullModule } from '@nestjs/bullmq'
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
processors: [join(__dirname, 'processor.js')],
}),
],
})
export class AppModule {}
请注意,由于您的函数正在分叉进程中执行,因此依赖注入(和 IoC 容器)将不可用。这意味着您的处理器函数将需要包含(或创建)它所需的所有外部依赖项实例。
异步配置
您可能希望异步传递 bullmq
选项,而不是静态传递。在这种情况下,使用 forRootAsync()
方法,它提供了几种处理异步配置的方法。同样,如果您想异步传递队列选项,请使用 registerQueueAsync()
方法。
一种方法是使用工厂函数:
BullModule.forRootAsync({
useFactory: () => ({
connection: {
host: 'localhost',
port: 6379,
},
}),
})
我们的工厂行为与任何其他异步提供程序(https://docs.nestjs.com/fundamentals/async-providers)类似(例如,它可以是`异步`的,并且能够通过`inject`注入依赖项)。
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
connection: {
host: configService.get('QUEUE_HOST'),
port: configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
})
或者,您可以使用 useClass
语法:
BullModule.forRootAsync({
useClass: BullConfigService,
})
上面的构造将在 BullModule
内实例化 BullConfigService
,并通过调用 createSharedConfiguration()
使用它来提供选项对象。请注意,这意味着 BullConfigService
必须实现 SharedBullConfigurationFactory
接口,如下所示:
@Injectable()
class BullConfigService implements SharedBullConfigurationFactory {
createSharedConfiguration(): BullModuleOptions {
return {
connection: {
host: 'localhost',
port: 6379,
},
}
}
}
为了防止在 BullModule
内创建 BullConfigService
并使用从不同模块导入的提供程序,您可以使用 useExisting
语法。
BullModule.forRootAsync({
imports: [ConfigModule],
useExisting: ConfigService,
})
此构造的工作原理与 useClass
相同,但有一个关键区别 - BullModule
将查找导入的模块以重用现有的 ConfigService
,而不是实例化新的。
Bull 安装
如果您决定使用 BullMQ,请跳过本节和后续章节。
要开始使用 Bull,我们首先安装所需的依赖项。
$ npm install --save @nestjs/bull bull
安装过程完成后,我们可以将 BullModule
导入根 AppModule
。
import { Module } from '@nestjs/common'
import { BullModule } from '@nestjs/bull'
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}
forRoot()
方法用于注册一个 bull
包配置对象,该对象将由应用程序中注册的所有队列使用(除非另有说明)。配置对象由以下属性组成:
limiter: RateLimiter
- 用于控制处理队列作业的速率的选项。有关更多信息,请参阅 RateLimiter。可选。redis: RedisOpts
- 用于配置 Redis 连接的选项。有关更多信息,请参阅 RedisOpts。可选。prefix: string
- 所有队列键的前缀。可选。defaultJobOptions: JobOpts
- 用于控制新作业的默认设置的选项。有关更多信息,请参阅 JobOpts。可选。settings: AdvancedSettings
- 高级队列配置设置。这些通常不应更改。有关更多信息,请参阅 AdvancedSettings。可选。
所有选项都是可选的,提供对队列行为的详细控制。这些选项直接传递给 Bull Queue
构造函数。在此处 了解有关这些选项的更多信息。
要注册队列,请导入 BullModule.registerQueue()
动态模块,如下所示:
BullModule.registerQueue({
name: 'audio',
})
通过将多个逗号分隔的配置对象传递给 registerQueue()
方法,创建多个队列。
registerQueue()
方法用于实例化和/或注册队列。队列在连接到具有相同凭据的同一底层 Redis 数据库的模块和进程之间共享。每个队列的名称属性都是唯一的。队列名称既用作注入令牌(用于将队列注入控制器/提供程序),又用作装饰器的参数,以将消费者类和侦听器与队列关联起来。
您还可以覆盖特定队列的一些预配置选项,如下所示:
BullModule.registerQueue({
name: 'audio',
redis: {
port: 6380,
},
})
由于作业在 Redis 中持久化,因此每次实例化特定命名队列时(例如,启动/重新启动应用程序时),它都会尝试处理可能存在于上一个未完成会话中的任何旧作业。
每个队列可以有一个或多个生产者、消费者和侦听器。消费者按特定顺序从队列中检索作业:FIFO(默认)、LIFO 或根据优先级。控制队列处理顺序在 此处 中讨论。
命名配置
如果您的队列连接到多个 Redis 实例,则可以使用一种称为 命名配置 的技术。此功能允许您在指定键下注册多个配置,然后您可以在队列选项中引用这些配置。
例如,假设您有一个额外的 Redis 实例(默认实例除外),由应用程序中注册的几个队列使用,您可以按如下方式注册其配置:
BullModule.forRoot('alternative-config', {
redis: {
port: 6381,
},
})
在上面的例子中,alternative-config
只是一个配置键(可以是任意字符串)。
有了这个,你现在可以在registerQueue()
选项对象中指向这个配置:
BullModule.registerQueue({
configKey: 'alternative-config',
name: 'video',
})
生产者
作业生产者将作业添加到队列。生产者通常是应用服务 (Nest providers)。要将作业添加到队列,首先将队列注入服务,如下所示:
import { Injectable } from '@nestjs/common'
import { Queue } from 'bull'
import { InjectQueue } from '@nestjs/bull'
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}
@InjectQueue()
装饰器通过其名称来标识队列,如 registerQueue()
方法调用中提供的名称(例如 'audio'
)。
现在,通过调用队列的 add()
方法添加作业,并传递用户定义的作业对象。作业以可序列化的 JavaScript 对象表示(因为它们就是这样存储在 Redis 数据库中的)。您传递的作业的形状是任意的;使用它来表示作业对象的语义。
const job = await this.audioQueue.add({
foo: 'bar',
})
命名作业
作业可能具有唯一名称。这允许您创建专门的消费者,该消费者将仅处理具有给定名称的作业。
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
})
使用命名作业时,您必须为添加到队列的每个唯一名称创建处理器,否则队列会抱怨您缺少给定作业的处理器。有关使用命名作业的更多信息,请参阅此处。
作业选项
作业可以具有与之关联的其他选项。在Queue.add()
方法中的job
参数后传递一个选项对象。作业选项属性包括:
priority
:number
- 可选优先级值。范围从 1(最高优先级)到 MAX_INT(最低优先级)。请注意,使用优先级会对性能产生轻微影响,因此请谨慎使用。delay
:number
- 等待此作业处理的时间量(毫秒)。请注意,为了获得准确的延迟,服务器和客户端的时钟都应该同步。attempts
:number
- 尝试作业直至完成的总尝试次数。repeat
:RepeatOpts
- 根据 cron 规范重复作业。请参阅 RepeatOpts。backoff
:number | BackoffOpts
- 如果作业失败,则自动重试的退避设置。请参阅 BackoffOpts。lifo
:boolean
- 如果为 true,则将作业添加到队列的右端而不是左端(默认为 false)。timeout
:number
- 作业应因超时错误而失败的毫秒数。jobId
:number
|string
- 覆盖作业 ID - 默认情况下,作业 ID 是一个唯一的整数,但您可以使用此设置来覆盖它。如果您使用此选项,则由您来确保 jobId 是唯一的。如果您尝试添加具有已存在 ID 的作业,则不会添加该作业。removeOnComplete
:boolean | number
- 如果为 true,则在作业成功完成后删除作业。数字指定要保留的作业数量。默认行为是将作业保留在已完成的集合中。removeOnFail
:boolean | number
- 如果为 true,则在所有尝试后失败时删除该作业。数字指定要保留的作业数量。默认行为是将作业保留在失败的集合中。stackTraceLimit
:number
- 限制将记录在堆栈跟踪中的堆栈跟踪行数。
以下是使用作业选项自定义作业的几个示例。
要延迟作业的启动,请使用 delay
配置属性。
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ delay: 3000 }, // 3 seconds delayed
)
要将作业添加到队列的右端(将作业处理为LIFO(后进先出)),请将配置对象的lifo
属性设置为true
。
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ lifo: true },
)
要对作业进行优先级排序,请使用 priority
属性。
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ priority: 2 },
)
Consumers
消费者是一个类,定义方法,可以处理添加到队列中的作业,或监听队列上的事件,或两者兼而有之。使用 @Processor()
装饰器声明消费者类,如下所示:
import { Processor } from '@nestjs/bull'
@Processor('audio')
export class AudioConsumer {}
消费者必须注册为提供者
,以便@nestjs/bull
包可以接收它们。
其中装饰器的字符串参数(例如audio
)是与类方法关联的队列的名称。
在消费者类中,通过使用@Process()
装饰器装饰处理程序方法来声明作业处理程序。
import { Process, Processor } from '@nestjs/bull'
import { Job } from 'bull'
@Processor('audio')
export class AudioConsumer {
@Process()
async transcode(job: Job<unknown>) {
let progress = 0
for (let i = 0; i < 100; i++) {
await doSomething(job.data)
progress += 1
await job.progress(progress)
}
return {}
}
}
每当工作器处于空闲状态且队列中有要处理的作业时,就会调用修饰方法(例如 transcode()
)。此处理程序方法接收 job
对象作为其唯一参数。处理程序方法返回的值存储在作业对象中,稍后可以访问,例如在完成事件的侦听器中。
Job
对象有多种方法可让您与其状态进行交互。例如,上面的代码使用 progress()
方法来更新作业的进度。请参阅 此处 以获取完整的 Job
对象 API 参考。
您可以指定作业处理程序方法将 仅 处理某种类型的作业(具有特定 name
的作业),方法是将该 name
传递给 @Process()
装饰器,如下所示。您可以在给定的消费者类中拥有多个 @Process()
处理程序,对应于每种作业类型(name
)。当您使用命名作业时,请确保每个名称都有一个对应的处理程序。
@Process('transcode')
async transcode(job: Job<unknown>) { ... }
当为同一队列定义多个使用者时,@Process({{ '{' }} concurrency: 1 {{ '}' }})
中的 concurrency
选项将不起作用。最小 concurrency
将与定义的使用者数量相匹配。即使 @Process()
处理程序使用不同的 name
来处理命名作业,这也适用。
请求范围的使用者
当使用者被标记为请求范围时(了解有关注入范围的更多信息 此处),将为每个作业专门创建该类的新实例。作业完成后,实例将被垃圾收集。
@Processor({
name: 'audio',
scope: Scope.REQUEST,
})
由于请求范围的消费者类是动态实例化的,并且范围限定为单个作业,因此您可以使用标准方法通过构造函数注入 JOB_REF
。
constructor(@Inject(JOB_REF) jobRef: Job) {
console.log(jobRef);
}
JOB_REF
令牌从 @nestjs/bull
包导入。
事件侦听器
当队列和/或作业状态发生变化时,Bull 会生成一组有用的事件。 Nest 提供了一组装饰器,允许订阅一组核心标准事件。 这些是从 @nestjs/bull
包导出的。
事件侦听器必须在 consumer 类中声明(即,在用 @Processor()
装饰器修饰的类中)。要监听事件,请使用下表中的装饰器之一来声明事件的处理程序。例如,要监听作业在音频
队列中进入活动状态时发出的事件,请使用以下构造:
import { Processor, Process, OnQueueActive } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@OnQueueActive()
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
由于 Bull 在分布式(多节点)环境中运行,因此它定义了事件局部性的概念。此概念认识到事件可能完全在单个进程内触发,也可能在来自不同进程的共享队列上触发。本地事件是在本地进程中的队列上触发操作或状态更改时产生的事件。换句话说,当您的事件生产者和消费者位于单个进程的本地时,队列上发生的所有事件都是本地的。
当队列在多个进程之间共享时,我们可能会遇到全局事件。要使一个进程中的侦听器接收由另一个进程触发的事件通知,它必须注册全局事件。
每当发出相应的事件时,就会调用事件处理程序。处理程序使用下表中显示的签名调用,提供对与事件相关的信息的访问。我们在下面讨论本地和全局事件处理程序签名之间的一个主要区别。
Local event listeners | Global event listeners | Handler method signature / When fired |
---|---|---|
@OnQueueError() | @OnGlobalQueueError() | handler(error: Error) - An error occurred. error contains the triggering error. |
@OnQueueWaiting() | @OnGlobalQueueWaiting() | handler(jobId: number | string) - A Job is waiting to be processed as soon as a worker is idling. jobId contains the id for the job that has entered this state. |
@OnQueueActive() | @OnGlobalQueueActive() | handler(job: Job) - Job job has started. |
@OnQueueStalled() | @OnGlobalQueueStalled() | handler(job: Job) - Job job has been marked as stalled. This is useful for debugging job workers that crash or pause the event loop. |
@OnQueueProgress() | @OnGlobalQueueProgress() | handler(job: Job, progress: number) - Job job 's progress was updated to value progress . |
@OnQueueCompleted() | @OnGlobalQueueCompleted() | handler(job: Job, result: any) Job job successfully completed with a result result . |
@OnQueueFailed() | @OnGlobalQueueFailed() | handler(job: Job, err: Error) Job job failed with reason err . |
@OnQueuePaused() | @OnGlobalQueuePaused() | handler() The queue has been paused. |
@OnQueueResumed() | @OnGlobalQueueResumed() | handler(job: Job) The queue has been resumed. |
@OnQueueCleaned() | @OnGlobalQueueCleaned() | handler(jobs: Job[], type: string) Old jobs have been cleaned from the queue. jobs is an array of cleaned jobs, and type is the type of jobs cleaned. |
@OnQueueDrained() | @OnGlobalQueueDrained() | handler() Emitted whenever the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed). |
@OnQueueRemoved() | @OnGlobalQueueRemoved() | handler(job: Job) Job job was successfully removed. |
在监听全局事件时,方法签名可能与本地方法签名略有不同。具体来说,任何在本地版本中接收job
对象的方法签名,在全局版本中都会接收jobId
(number
)。在这种情况下,要获取对实际job
对象的引用,请使用Queue#getJob
方法。应等待此调用,因此应将处理程序声明为异步
。例如:
@OnGlobalQueueCompleted()
async onGlobalCompleted(jobId: number, result: any) {
const job = await this.immediateQueue.getJob(jobId);
console.log('(Global) on completed: job ', job.id, ' -> result: ', result);
}
要访问 Queue
对象(进行 getJob()
调用),您当然必须注入它。此外,队列必须在您注入它的模块中注册。
除了特定的事件侦听器装饰器外,您还可以将通用的 @OnQueueEvent()
装饰器与 BullQueueEvents
或 BullQueueGlobalEvents
枚举结合使用。此处 了解有关事件的更多信息。
队列管理
队列有一个 API,允许您执行管理功能,如暂停和恢复、检索各种状态下的作业数量等。您可以在 此处 找到完整的队列 API。直接在 Queue
对象上调用这些方法中的任何一个,如下面的暂停/恢复示例所示。
使用 pause()
方法调用暂停队列。暂停的队列在恢复之前不会处理新作业,但当前正在处理的作业将继续,直到完成为止。
await audioQueue.pause()
要恢复暂停的队列,请使用 resume()
方法,如下所示:
await audioQueue.resume()
独立进程
作业处理程序也可以在独立(分叉)进程中运行(来源)。这有几个优点:
- 该进程是沙盒化的,因此如果它崩溃,也不会影响工作器。
- 您可以运行阻塞代码而不影响队列(作业不会停滞)。
- 多核 CPU 的利用率更高。
- 与 redis 的连接更少。
import { join } from 'node:path'
import { Module } from '@nestjs/common'
import { BullModule } from '@nestjs/bull'
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
processors: [join(__dirname, 'processor.js')],
}),
],
})
export class AppModule {}
请注意,由于您的函数是在分叉进程中执行的,因此依赖注入(和 IoC 容器)将不可用。这意味着您的处理器函数将需要包含(或创建)它所需的所有外部依赖项实例。
import { DoneCallback, Job } from 'bull'
export default function (job: Job, cb: DoneCallback) {
console.log(`[${process.pid}] ${JSON.stringify(job.data)}`)
cb(null, 'It works')
}
异步配置
您可能希望异步传递 bull
选项,而不是静态传递。在这种情况下,请使用 forRootAsync()
方法,该方法提供了几种处理异步配置的方法。同样,如果您想异步传递队列选项,请使用 registerQueueAsync()
方法。
一种方法是使用工厂函数:
BullModule.forRootAsync({
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
})
我们的工厂的行为与其他任何 异步提供程序 类似(例如,它可以是 async
,并且能够通过 inject
注入依赖项)。
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('QUEUE_HOST'),
port: configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
})
或者,您可以使用 useClass
语法:
BullModule.forRootAsync({
useClass: BullConfigService,
})
上述构造将在BullModule
内实例化BullConfigService
,并通过调用createSharedConfiguration()
使用它来提供选项对象。请注意,这意味着BullConfigService
必须实现SharedBullConfigurationFactory
接口,如下所示:
@Injectable()
class BullConfigService implements SharedBullConfigurationFactory {
createSharedConfiguration(): BullModuleOptions {
return {
redis: {
host: 'localhost',
port: 6379,
},
}
}
}
为了防止在 BullModule
内创建 BullConfigService
并使用从其他模块导入的提供程序,您可以使用 useExisting
语法。
BullModule.forRootAsync({
imports: [ConfigModule],
useExisting: ConfigService,
})
此构造的工作原理与 useClass
相同,但有一个关键区别 - BullModule
将查找导入的模块以重用现有的 ConfigService
,而不是实例化新的。
Example
此处 提供了一个工作示例。