定制传输 Custom transporters

导读

Nest 提供了各种开箱即用的 传输器,以及允许开发人员构建新的自定义传输策略的 API。 传输器使您能够使用可插入通信层和非常简单的应用程序级消息协议通过网络连接组件(阅读完整文章)。

提示

使用 Nest 构建微服务并不一定意味着您必须使用 @nestjs/microservices 包。例如,如果您想与外部服务(比如说用不同语言编写的其他微服务)进行通信,您可能不需要 @nestjs/microservice 库提供的所有功能。

事实上,如果您不需要装饰器(@EventPattern@MessagePattern)来声明性地定义订阅者,那么运行 独立应用程序 并手动维护连接/订阅频道应该足以满足大多数用例的需求,并且将为您提供更大的灵活性。

使用自定义传输器,您可以集成任何消息传递系统/协议(包括 Google Cloud Pub/Sub、Amazon Kinesis 等)或扩展现有传输器,并在其上添加额外功能(例如,MQTT 的 QoS)。

提示

为了更好地了解 Nest 微服务的工作原理以及如何扩展现有传输器的功能,我们建议阅读 NestJS 微服务实战高级 NestJS 微服务 文章系列。

创建策略

首先,让我们定义一个代表我们自定义传输器的类。

ts
import { CustomTransportStrategy, Server } from '@nestjs/microservices'

class GoogleCloudPubSubServer
  extends Server
  implements CustomTransportStrategy {
  /**
   * This method is triggered when you run "app.listen()".
   */
  listen(callback: () => void) {
    callback()
  }

  /**
   * This method is triggered on application shutdown.
   */
  close() {}
}
警告

请注意,我们不会在本章中实现功能齐全的 Google Cloud Pub/Sub 服务器,因为这需要深入研究传输器特定的技术细节。

在上面的示例中,我们声明了 GoogleCloudPubSubServer 类,并提供了由 CustomTransportStrategy 接口强制执行的 listen()close() 方法。 此外,我们的类扩展了从 @nestjs/microservices 包导入的 Server 类,该类提供了一些有用的方法,例如 Nest 运行时用于注册消息处理程序的方法。或者,如果您想扩展现有传输策略的功能,您可以扩展相应的服务器类,例如 ServerRedis。 按照惯例,我们为我们的类添加了 Server 后缀,因为它将负责订阅消息/事件(并在必要时响应它们)。

有了这个,我们现在可以使用自定义策略而不是使用内置传输器,如下所示:

ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    strategy: new GoogleCloudPubSubServer(),
  },
)

基本上,我们传递的不是带有 transportoptions 属性的普通传输器选项对象,而是传递单个属性 strategy,其值是我们自定义传输器类的一个实例。

回到我们的 GoogleCloudPubSubServer 类,在实际应用程序中,我们将建立与我们的消息代理/外部服务的连接,并在 listen() 方法中注册订阅者/监听特定频道(然后在 close() 拆卸方法中删除订阅并关闭连接), 但由于这需要很好地理解 Nest 微服务如何相互通信,我们建议阅读此 文章系列。 在本章中,我们将重点介绍 Server 类提供的功能以及如何利用它们来构建自定义策略。

例如,假设在我们的应用程序中的某个地方定义了以下消息处理程序:

ts
@MessagePattern('echo')
echo(@Payload() data: object) {
  return data;
}

此消息处理程序将由 Nest 运行时自动注册。使用 Server 类,您可以查看已注册的消息模式,还可以访问和执行分配给它们的实际方法。 为了测试这一点,让我们在调用 callback 函数之前在 listen() 方法中添加一个简单的 console.log

ts
listen(callback: () => void) {
  console.log(this.messageHandlers);
  callback();
}

应用程序重新启动后,您将在终端中看到以下日志:

ts
Map { 'echo' => [AsyncFunction] { isEventHandler: false } }
Hint

如果我们使用 @EventPattern 装饰器,您将看到相同的输出,但 isEventHandler 属性设置为 true

如您所见,messageHandlers 属性是所有消息(和事件)处理程序的 Map 集合,其中使用模式作为键。 现在,您可以使用键(例如 "echo")来接收对消息处理程序的引用:

ts
async listen(callback: () => void) {
  const echoHandler = this.messageHandlers.get('echo');
  console.log(await echoHandler('Hello world!'));
  callback();
}

一旦我们执行 echoHandler 并传递任意字符串作为参数(这里是 ``Hello world!`),我们应该在控制台中看到它:

bash
Hello world!

这意味着我们的方法处理程序已正确执行。

当使用带有 InterceptorsCustomTransportStrategy 时,处理程序将被包装到 RxJS 流中。 这意味着您需要订阅它们才能执行流的底层逻辑(例如,在执行拦截器后继续进入控制器逻辑)。

下面可以看到一个示例:

ts
async listen(callback: () => void) {
  const echoHandler = this.messageHandlers.get('echo');
  const streamOrResult = await echoHandler('Hello World');
  if (isObservable(streamOrResult)) {
    streamOrResult.subscribe();
  }
  callback();
}

客户端代理

正如我们在第一部分中提到的,您不一定需要使用 @nestjs/microservices 包来创建微服务,但如果您决定这样做并且需要集成自定义策略,您也需要提供一个客户端类。

提示

同样,实现与所有 @nestjs/microservices 功能(例如流式传输)兼容的功能齐全的客户端类需要很好地理解框架使用的通信技术。要了解更多信息,请查看此 文章

要与外部服务通信/发出和发布消息(或事件),您可以使用特定于库的 SDK 包,也可以实现扩展 ClientProxy 的自定义客户端类,如下所示:

ts
import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices'

class GoogleCloudPubSubClient extends ClientProxy {
  async connect(): Promise<any> {}
  async close() {}
  async dispatchEvent(packet: ReadPacket<any>): Promise<any> {}
  publish(
    packet: ReadPacket<any>,
    callback: (packet: WritePacket<any>) => void,
  ): Function {}
}
Warning

Please, note we won't be implementing a fully-featured Google Cloud Pub/Sub client in this chapter as this would require diving into transporter specific technical details.

As you can see, ClientProxy class requires us to provide several methods for establishing & closing the connection and publishing messages (publish) and events (dispatchEvent). Note, if you don't need a request-response communication style support, you can leave the publish() method empty. Likewise, if you don't need to support event-based communication, skip the dispatchEvent() method.

To observe what and when those methods are executed, let's add multiple console.log calls, as follows:

ts
class GoogleCloudPubSubClient extends ClientProxy {
  async connect(): Promise<any> {
    console.log('connect')
  }

  async close() {
    console.log('close')
  }

  async dispatchEvent(packet: ReadPacket<any>): Promise<any> {
    return console.log('event to dispatch: ', packet)
  }

  publish(
    packet: ReadPacket<any>,
    callback: (packet: WritePacket<any>) => void,
  ): Function {
    console.log('message:', packet)

    // In a real-world application, the "callback" function should be executed
    // with payload sent back from the responder. Here, we'll simply simulate (5 seconds delay)
    // that response came through by passing the same "data" as we've originally passed in.
    setTimeout(() => callback({ response: packet.data }), 5000)

    return () => console.log('teardown')
  }
}

有了这个,让我们创建一个 GoogleCloudPubSubClient 类的实例并运行 send() 方法(您可能在前面的章节中看到过),订阅返回的可观察流。

ts
const googlePubSubClient = new GoogleCloudPubSubClient()
googlePubSubClient
  .send('pattern', 'Hello world!')
  .subscribe(response => console.log(response))

现在,你应该在终端中看到以下输出:

ts
connect
message: { pattern: 'pattern', data: 'Hello world!' }
Hello world! // <-- after 5 seconds

为了测试我们的teardown方法(我们的publish()方法返回)是否正确执行,让我们将超时运算符应用到我们的流,将其设置为 2 秒,以确保它在我们的setTimeout调用callback函数之前抛出。

ts
const googlePubSubClient = new GoogleCloudPubSubClient()
googlePubSubClient
  .send('pattern', 'Hello world!')
  .pipe(timeout(2000))
  .subscribe(
    response => console.log(response),
    error => console.error(error.message),
  )
提示

timeout 操作符从 rxjs/operators 包导入。

应用 timeout 操作符后,您的终端输出应如下所示:

ts
connect
message: { pattern: 'pattern', data: 'Hello world!' }
teardown // <-- teardown
Timeout has occurred

要分派事件(而不是发送消息),请使用 emit() 方法:

ts
googlePubSubClient.emit('event', 'Hello world!')

这就是您应该在控制台中看到的内容:

ts
connect
event to dispatch:  { pattern: 'event', data: 'Hello world!' }

消息序列化

如果您需要在客户端的响应序列化周围添加一些自定义逻辑,则可以使用扩展ClientProxy类或其子类之一的自定义类。要修改成功的请求,您可以覆盖serializeResponse方法,要修改通过此客户端的任何错误,您可以覆盖serializeError方法。要使用此自定义类,您可以使用customClass属性将类本身传递给ClientsModule.register()方法。下面是一个自定义ClientProxy的示例,它将每个错误序列化为RpcException

error-handling.proxy
ts
import { ClientTcp, RpcException } from '@nestjs/microservices'

class ErrorHandlingProxy extends ClientTCP {
  serializeError(err: Error) {
    return new RpcException(err)
  }
}

然后在 ClientsModule 中使用它,如下所示:

app.module
ts
@Module({
  imports: [
    ClientsModule.register([{
      name: 'CustomProxy',
      customClass: ErrorHandlingProxy,
    }]),
  ]
})
export class AppModule
hint

这是传递给 customClass 的类本身,而不是类的实例。Nest 将在后台为您创建实例,并将传递给 options 属性的任何选项传递给新的 ClientProxy