MQTT(消息队列遥测传输)是一种开源的轻量级消息传递协议,针对低延迟进行了优化。此协议提供了一种使用发布/订阅模型连接设备的可扩展且经济高效的方法。基于 MQTT 构建的通信系统由发布服务器、代理和一个或多个客户端组成。它专为受限设备和低带宽、高延迟或不可靠的网络而设计。
安装
要开始构建基于 MQTT 的微服务,首先安装所需的包:
$ npm i --save mqtt
概述
要使用 MQTT 传输器,请将以下选项对象传递给 createMicroservice()
方法:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
},
})
Transport
枚举从 @nestjs/microservices
包导入。
选项
options
对象特定于所选传输器。MQTT 传输器公开了 此处 描述的属性。
客户端
与其他微服务传输器一样,您有 多个选项 用于创建 MQTT ClientProxy
实例。
创建实例的一种方法是使用 ClientsModule
。要使用 ClientsModule
创建客户端实例,请导入它并使用 register()
方法传递一个选项对象,该对象具有与 createMicroservice()
方法中上面显示的属性相同的属性,以及一个用作注入令牌的 name
属性。在此处阅读有关 ClientsModule
的更多信息。
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
}
},
]),
]
})
还可以使用其他选项来创建客户端(ClientProxyFactory
或 @Client()
)。您可以在 此处 阅读有关它们的信息。
上下文
在更复杂的场景中,您可能希望访问有关传入请求的更多信息。使用 MQTT 传输器时,您可以访问 MqttContext
对象。
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
console.log(`Topic: ${context.getTopic()}`);
}
@Payload()
、@Ctx()
和 MqttContext
从 @nestjs/microservices
包导入。
要访问原始 mqtt packet,请使用 MqttContext
对象的 getPacket()
方法,如下所示:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
console.log(context.getPacket());
}
通配符
订阅可能针对明确主题,也可能包含通配符。有两个通配符可用,+
和 #
。+
是单级通配符,而 #
是多级通配符,涵盖多个主题级别。
@MessagePattern('sensors/+/temperature/+')
getTemperature(@Ctx() context: MqttContext) {
console.log(`Topic: ${context.getTopic()}`);
}
服务质量 (QoS)
使用 @MessagePattern
或 @EventPattern
装饰器创建的任何订阅都将使用 QoS 0 进行订阅。如果需要更高的 QoS,可以在建立连接时使用 subscribeOptions
块进行全局设置,如下所示:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
subscribeOptions: {
qos: 2
},
},
})
如果需要特定于主题的 QoS,请考虑创建 自定义传输器。
记录构建器
要配置消息选项(调整 QoS 级别、设置 Retain 或 DUP 标志或向有效负载添加其他属性),您可以使用 MqttRecordBuilder
类。例如,要将 QoS
设置为 2
,请使用 setQoS
方法,如下所示:
const userProperties = { 'x-version': '1.0.0' };
const record = new MqttRecordBuilder(':cat:')
.setProperties({ userProperties })
.setQoS(1)
.build();
client.send('replace-emoji', record).subscribe(...);
MqttRecordBuilder
类从 @nestjs/microservices
包导出。
您也可以通过访问 MqttContext
在服务器端读取这些选项。
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: MqttContext): string {
const { properties: { userProperties } } = context.getPacket();
return userProperties['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
在某些情况下您可能想要为多个请求配置用户属性,您可以将这些选项传递给ClientProxyFactory
。
import { Module } from '@nestjs/common'
import { ClientProxyFactory, Transport } from '@nestjs/microservices'
@Module({
providers: [
{
provide: 'API_v1',
useFactory: () =>
ClientProxyFactory.create({
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1833',
userProperties: { 'x-version': '1.0.0' },
},
}),
},
],
})
export class ApiModule {}