RabbitMQ RabbitMQ

导读

RabbitMQ 是一个开源的轻量级消息代理,支持多种消息传递协议。它可以部署在分布式和联合配置中,以满足高规模、高可用性的要求。此外,它是部署最广泛的消息代理,在世界各地的小型初创公司和大型企业中使用。

安装

要开始构建基于 RabbitMQ 的微服务,首先安装所需的软件包:

bash
$ npm i --save amqplib amqp-connection-manager

概述

要使用 RabbitMQ 传输器,请将以下选项对象传递给 createMicroservice() 方法:

ts
main
ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.RMQ,
  options: {
    urls: ['amqp://localhost:5672'],
    queue: 'cats_queue',
    queueOptions: {
      durable: false
    },
  },
})
提示

Transport 枚举从 @nestjs/microservices 包导入。

选项

options 属性特定于所选传输器。RabbitMQ 传输器公开了下面描述的属性。

urlsConnection urls
queueQueue name which your server will listen to
prefetchCountSets the prefetch count for the channel
isGlobalPrefetchCountEnables per channel prefetching
noAckIf false, manual acknowledgment mode enabled
consumerTagConsumer Tag Identifier here
queueOptionsAdditional queue options read more here
socketOptionsAdditional socket options read more here
headersHeaders to be sent along with every message

客户端

与其他微服务传输器一样,您有多个选项 用于创建 RabbitMQ ClientProxy 实例。

创建实例的一种方法是使用 ClientsModule。 要使用 ClientsModule 创建客户端实例,请导入它并使用 register() 方法传递具有与 createMicroservice() 方法中上面显示的相同属性的选项对象,以及用作注入令牌的 name 属性。 在此处 了解有关 ClientsModule 的更多信息。

ts
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'],
          queue: 'cats_queue',
          queueOptions: {
            durable: false
          },
        },
      },
    ]),
  ]
})

还可以使用其他选项来创建客户端(ClientProxyFactory@Client())。您可以在 此处 阅读有关它们的信息。

上下文

在更复杂的场景中,您可能希望访问有关传入请求的更多信息。使用 RabbitMQ 传输器时,您可以访问 RmqContext 对象。

ts
TS
ts
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(`Pattern: ${context.getPattern()}`);
}
提示

@Payload()@Ctx()RmqContext@nestjs/microservices 包导入。

要访问原始 RabbitMQ 消息(包含 propertiesfieldscontent),请使用 RmqContext 对象的 getMessage() 方法,如下所示:

ts
TS
ts
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage());
}

要检索对 RabbitMQ 通道的引用,请使用 RmqContext 对象的 getChannelRef 方法,如下所示:

ts
TS
ts
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getChannelRef());
}

消息确认

为了确保消息永不丢失,RabbitMQ 支持 消息确认。消费者会发回确认,告知 RabbitMQ 某条消息已收到并处理,并且 RabbitMQ 可以删除该消息。如果消费者死亡(其通道已关闭、连接已关闭或 TCP 连接已丢失)且未发送确认,RabbitMQ 会认为该消息未得到完全处理,并会将其重新排队。

要启用手动确认模式,请将 noAck 属性设置为 false

ts
options: {
  urls: ['amqp://localhost:5672'],
  queue: 'cats_queue',
  noAck: false,
  queueOptions: {
    durable: false
  },
},

当打开手动消费者确认时,我们必须从工作者发送适当的确认以表明我们已完成任务。

ts
TS
ts
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();

  channel.ack(originalMsg);
}

记录构建器

要配置消息选项,您可以使用 RmqRecordBuilder 类(注意:这也适用于基于事件的流)。例如,要设置 headerspriority 属性,请使用 setOptions 方法,如下所示:

ts
const message = ':cat:';
const record = new RmqRecordBuilder(message)
  .setOptions({
    headers: {
      ['x-version']: '1.0.0',
    },
    priority: 3,
  })
  .build();

this.client.send('replace-emoji', record).subscribe(...);
提示

RmqRecordBuilder 类从 @nestjs/microservices 包导出。

您也可以通过访问 RmqContext 在服务器端读取这些值,如下所示:

ts
TS
ts
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
  const { properties: { headers } } = context.getMessage();
  return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}