RabbitMQ 是一个开源的轻量级消息代理,支持多种消息传递协议。它可以部署在分布式和联合配置中,以满足高规模、高可用性的要求。此外,它是部署最广泛的消息代理,在世界各地的小型初创公司和大型企业中使用。
安装
要开始构建基于 RabbitMQ 的微服务,首先安装所需的软件包:
$ npm i --save amqplib amqp-connection-manager
概述
要使用 RabbitMQ 传输器,请将以下选项对象传递给 createMicroservice()
方法:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
})
Transport
枚举从 @nestjs/microservices
包导入。
选项
options
属性特定于所选传输器。RabbitMQ 传输器公开了下面描述的属性。
urls | Connection urls |
queue | Queue name which your server will listen to |
prefetchCount | Sets the prefetch count for the channel |
isGlobalPrefetchCount | Enables per channel prefetching |
noAck | If false , manual acknowledgment mode enabled |
consumerTag | Consumer Tag Identifier here |
queueOptions | Additional queue options read more here |
socketOptions | Additional socket options read more here |
headers | Headers to be sent along with every message |
客户端
与其他微服务传输器一样,您有多个选项 用于创建 RabbitMQ ClientProxy
实例。
创建实例的一种方法是使用 ClientsModule
。 要使用 ClientsModule
创建客户端实例,请导入它并使用 register()
方法传递具有与 createMicroservice()
方法中上面显示的相同属性的选项对象,以及用作注入令牌的 name
属性。 在此处 了解有关 ClientsModule
的更多信息。
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
},
]),
]
})
还可以使用其他选项来创建客户端(ClientProxyFactory
或 @Client()
)。您可以在 此处 阅读有关它们的信息。
上下文
在更复杂的场景中,您可能希望访问有关传入请求的更多信息。使用 RabbitMQ 传输器时,您可以访问 RmqContext
对象。
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(`Pattern: ${context.getPattern()}`);
}
@Payload()
、@Ctx()
和 RmqContext
从 @nestjs/microservices
包导入。
要访问原始 RabbitMQ 消息(包含 properties
、fields
和 content
),请使用 RmqContext
对象的 getMessage()
方法,如下所示:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getMessage());
}
要检索对 RabbitMQ 通道的引用,请使用 RmqContext
对象的 getChannelRef
方法,如下所示:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getChannelRef());
}
消息确认
为了确保消息永不丢失,RabbitMQ 支持 消息确认。消费者会发回确认,告知 RabbitMQ 某条消息已收到并处理,并且 RabbitMQ 可以删除该消息。如果消费者死亡(其通道已关闭、连接已关闭或 TCP 连接已丢失)且未发送确认,RabbitMQ 会认为该消息未得到完全处理,并会将其重新排队。
要启用手动确认模式,请将 noAck
属性设置为 false
:
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
noAck: false,
queueOptions: {
durable: false
},
},
当打开手动消费者确认时,我们必须从工作者发送适当的确认以表明我们已完成任务。
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
channel.ack(originalMsg);
}
记录构建器
要配置消息选项,您可以使用 RmqRecordBuilder
类(注意:这也适用于基于事件的流)。例如,要设置 headers
和 priority
属性,请使用 setOptions
方法,如下所示:
const message = ':cat:';
const record = new RmqRecordBuilder(message)
.setOptions({
headers: {
['x-version']: '1.0.0',
},
priority: 3,
})
.build();
this.client.send('replace-emoji', record).subscribe(...);
RmqRecordBuilder
类从 @nestjs/microservices
包导出。
您也可以通过访问 RmqContext
在服务器端读取这些值,如下所示:
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
const { properties: { headers } } = context.getMessage();
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}