MQTT MQTT

导读

MQTT(消息队列遥测传输)是一种开源的轻量级消息传递协议,针对低延迟进行了优化。此协议提供了一种使用发布/订阅模型连接设备的可扩展且经济高效的方法。基于 MQTT 构建的通信系统由发布服务器、代理和一个或多个客户端组成。它专为受限设备和低带宽、高延迟或不可靠的网络而设计。

安装

要开始构建基于 MQTT 的微服务,首先安装所需的包:

bash
$ npm i --save mqtt

概述

要使用 MQTT 传输器,请将以下选项对象传递给 createMicroservice() 方法:

ts
main
ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.MQTT,
  options: {
    url: 'mqtt://localhost:1883',
  },
})
提示

Transport 枚举从 @nestjs/microservices 包导入。

选项

options 对象特定于所选传输器。MQTT 传输器公开了 此处 描述的属性。

客户端

与其他微服务传输器一样,您有 多个选项 用于创建 MQTT ClientProxy 实例。

创建实例的一种方法是使用 ClientsModule。要使用 ClientsModule 创建客户端实例,请导入它并使用 register() 方法传递一个选项对象,该对象具有与 createMicroservice() 方法中上面显示的属性相同的属性,以及一个用作注入令牌的 name 属性。在此处阅读有关 ClientsModule 的更多信息。

ts
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.MQTT,
        options: {
          url: 'mqtt://localhost:1883',
        }
      },
    ]),
  ]
})

还可以使用其他选项来创建客户端(ClientProxyFactory@Client())。您可以在 此处 阅读有关它们的信息。

上下文

在更复杂的场景中,您可能希望访问有关传入请求的更多信息。使用 MQTT 传输器时,您可以访问 MqttContext 对象。

ts
TS
ts
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
  console.log(`Topic: ${context.getTopic()}`);
}
提示

@Payload()@Ctx()MqttContext@nestjs/microservices 包导入。

要访问原始 mqtt packet,请使用 MqttContext 对象的 getPacket() 方法,如下所示:

ts
TS
ts
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
  console.log(context.getPacket());
}

通配符

订阅可能针对明确主题,也可能包含通配符。有两个通配符可用,+#+ 是单级通配符,而 # 是多级通配符,涵盖多个主题级别。

ts
TS
ts
@MessagePattern('sensors/+/temperature/+')
getTemperature(@Ctx() context: MqttContext) {
  console.log(`Topic: ${context.getTopic()}`);
}

服务质量 (QoS)

使用 @MessagePattern@EventPattern 装饰器创建的任何订阅都将使用 QoS 0 进行订阅。如果需要更高的 QoS,可以在建立连接时使用 subscribeOptions 块进行全局设置,如下所示:

ts
main
ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.MQTT,
  options: {
    url: 'mqtt://localhost:1883',
    subscribeOptions: {
      qos: 2
    },
  },
})

如果需要特定于主题的 QoS,请考虑创建 自定义传输器

记录构建器

要配置消息选项(调整 QoS 级别、设置 Retain 或 DUP 标志或向有效负载添加其他属性),您可以使用 MqttRecordBuilder 类。例如,要将 QoS 设置为 2,请使用 setQoS 方法,如下所示:

ts
const userProperties = { 'x-version': '1.0.0' };
const record = new MqttRecordBuilder(':cat:')
  .setProperties({ userProperties })
  .setQoS(1)
  .build();
client.send('replace-emoji', record).subscribe(...);
提示

MqttRecordBuilder 类从 @nestjs/microservices 包导出。

您也可以通过访问 MqttContext 在服务器端读取这些选项。

ts
TS
ts
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: MqttContext): string {
  const { properties: { userProperties } } = context.getPacket();
  return userProperties['x-version'] === '1.0.0' ? '🐱' : '🐈';
}

在某些情况下您可能想要为多个请求配置用户属性,您可以将这些选项传递给ClientProxyFactory

ts
import { Module } from '@nestjs/common'
import { ClientProxyFactory, Transport } from '@nestjs/microservices'

@Module({
  providers: [
    {
      provide: 'API_v1',
      useFactory: () =>
        ClientProxyFactory.create({
          transport: Transport.MQTT,
          options: {
            url: 'mqtt://localhost:1833',
            userProperties: { 'x-version': '1.0.0' },
          },
        }),
    },
  ],
})
export class ApiModule {}