概述 Overview

导读

除了传统的(有时称为单片)应用程序架构之外,Nest 原生支持微服务架构开发风格。本文档其他地方讨论的大多数概念(例如依赖项注入、装饰器、异常过滤器、管道、保护和拦截器)同样适用于微服务。Nest 尽可能抽象实现细节,以便相同的组件可以跨基于 HTTP 的平台、WebSocket 和微服务运行。本节介绍 Nest 特定于微服务的方面。

在 Nest 中,微服务从根本上来说是一个使用不同于 HTTP 的 传输 层的应用程序。

img

Nest 支持几种内置传输层实现,称为 传输器,负责在不同的微服务实例之间传输消息。大多数传输器原生支持 请求-响应基于事件 的消息样式。 Nest 将每个传输器的实现细节抽象为请求-响应和基于事件的消息传递的规范接口。这样可以轻松地从一个传输层切换到另一个传输层 - 例如利用特定传输层的特定可靠性或性能功能 - 而不会影响您的应用程序代码。

安装

要开始构建微服务,首先安装所需的包:

bash
$ npm i --save @nestjs/microservices

入门

要实例化微服务,请使用 NestFactory 类的 createMicroservice() 方法:

ts
main
ts
import { NestFactory } from '@nestjs/core'
import { MicroserviceOptions, Transport } from '@nestjs/microservices'
import { AppModule } from './app.module'

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.TCP,
    },
  )
  await app.listen()
}
bootstrap()
提示

微服务默认使用 TCP 传输层。

createMicroservice() 方法的第二个参数是 options 对象。此对象可能由两个成员组成:

transport指定传输器(例如,Transport.NATS
options确定传输器行为的传输器特定选项对象

options 对象特定于所选传输器。TCP 传输器公开 下文描述的属性。对于其他传输器(例如,Redis、MQTT 等),请参阅相关章节以了解可用选项的描述。

host连接主机名
port连接端口
retryAttempts重试消息的次数(默认值:0
retryDelay消息重试尝试之间的延迟(毫秒)(默认值:0
serializer自定义 序列化器用于传出消息
反序列化器自定义反序列化器用于传入消息
socketClass扩展TcpSocket的自定义Socket(默认值:JsonSocket
tlsOptions配置 tls 协议的选项

模式

微服务通过模式识别消息和事件。模式是纯值,例如文字对象或字符串。模式会自动序列化并与消息的数据部分一起通过网络发送。通过这种方式,消息发送者和消费者可以协调哪些请求由哪些处理程序使用。

请求-响应

当您需要在各种外部服务之间交换消息时,请求-响应消息样式很有用。使用此范例,您可以确定服务确实收到了消息(无需手动实现消息 ACK 协议)。但是,请求-响应范例并不总是最佳选择。例如,使用基于日志的持久性的流式传输器(如 KafkaNATS 流式传输)针对解决不同范围的问题进行了优化,更符合事件消息传递范式(有关更多详细信息,请参阅下面的 基于事件的消息传递)。

为了启用请求-响应消息类型,Nest 创建了两个逻辑通道 - 一个负责传输数据,而另一个负责等待传入的响应。 对于某些底层传输,例如 NATS,这种双通道支持是开箱即用的。 对于其他传输,Nest 通过手动创建单独的通道进行补偿。 这可能会产生开销,因此如果您不需要请求-响应消息样式,则应考虑使用基于事件的方法。

要基于请求-响应范例创建消息处理程序,请使用从@nestjs/microservices包导入的@MessagePattern()装饰器。此装饰器应仅在 controller 类中使用,因为它们是应用程序的入口点。在提供程序内部使用它们不会产生任何影响,因为它们会被 Nest 运行时忽略。

ts
math.controller
ts
import { Controller } from '@nestjs/common'
import { MessagePattern } from '@nestjs/microservices'

@Controller()
export class MathController {
  @MessagePattern({ cmd: 'sum' })
  accumulate(data: number[]): number {
    return (data || []).reduce((a, b) => a + b)
  }
}

在上面的代码中,accumulate() 消息处理程序 监听满足 {{ '{' }} cmd: 'sum' {{ '}' }} 消息模式的消息。消息处理程序接受一个参数,即从客户端传递的 data。在本例中,数据是一个要累积的数字数组。

异步响应

消息处理程序能够同步或异步响应。因此,支持 async 方法。

ts
TS
ts
@MessagePattern({ cmd: 'sum' })
async accumulate(data: number[]): Promise<number> {
  return (data || []).reduce((a, b) => a + b);
}

消息处理程序还能够返回Observable,在这种情况下,结果值将被发出,直到流完成。

ts
TS
ts
@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): Observable<number> {
  return from([1, 2, 3]);
}

在上面的示例中,消息处理程序将响应3次(针对数组中的每个项目)。

基于事件

虽然请求-响应方法非常适合在服务之间交换消息,但当您的消息样式是基于事件时(当您只想发布事件而不等待响应时),它就不太合适了。在这种情况下,您不希望请求-响应需要开销来维护两个通道。

假设您只想通知另一项服务系统的这一部分发生了某种情况。这是基于事件的消息样式的理想用例。

要创建事件处理程序,我们使用从@nestjs/microservices包导入的@EventPattern()装饰器。

ts
TS
ts
@EventPattern('user_created')
async handleUserCreated(data: Record<string, unknown>) {
  // business logic
}
提示

您可以为单个事件模式注册多个事件处理程序,所有事件处理程序都将自动并行触发。

handleUserCreated() 事件处理程序监听 'user_created' 事件。事件处理程序接受一个参数,即从客户端传递的 data(在本例中,是通过网络发送的事件负载)。

装饰器

在更复杂的场景中,您可能希望访问有关传入请求的更多信息。例如,在具有通配符订阅的 NATS 的情况下,您可能希望获取生产者已将消息发送到的原始主题。同样,在 Kafka 中,您可能希望访问消息头。为了实现这一点,您可以使用内置装饰器,如下所示:

ts
TS
ts
@MessagePattern('time.us.*')
getDate(@Payload() data: number[], @Ctx() context: NatsContext) {
  console.log(`Subject: ${context.getSubject()}`); // e.g. "time.us.east"
  return new Date().toLocaleTimeString(...);
}
提示

@Payload()@Ctx()NatsContext 是从 @nestjs/microservices 导入的。

提示

您还可以将属性键传递给 @Payload() 装饰器,以从传入的有效负载对象中提取特定属性,例如 @Payload('id')

客户端

客户端 Nest 应用程序可以使用 ClientProxy 类交换消息或将事件发布到 Nest 微服务。此类定义了几种方法,例如 send()(用于请求-响应消息传递)和 emit()(用于事件驱动消息传递),可让您与远程微服务进行通信。通过以下方式之一获取此类的实例。

一种技术是导入 ClientsModule,它公开静态 register() 方法。此方法接受一个参数,该参数是代表微服务传输器的对象数组。每个这样的对象都有一个 name 属性、一个可选的 transport 属性(默认为 Transport.TCP)和一个可选的特定于传输器的 options 属性。

name 属性用作注入令牌,可用于在需要时注入 ClientProxy 的实例。作为注入令牌,name 属性的值可以是任意字符串或 JavaScript 符号,如此处 所述。

options 属性是一个具有与我们之前在 createMicroservice() 方法中看到的相同属性的对象。

ts
@Module({
  imports: [
    ClientsModule.register([
      { name: 'MATH_SERVICE', transport: Transport.TCP },
    ]),
  ]
  ...
})

一旦模块被导入,我们就可以使用@Inject()装饰器,注入通过上面显示的'MATH_SERVICE'传输器选项指定配置的ClientProxy实例。

ts
constructor(
  @Inject('MATH_SERVICE') private client: ClientProxy,
) {}
提示

ClientsModuleClientProxy 类是从 @nestjs/microservices 包导入的。

有时我们可能需要从另一个服务(例如 ConfigService)获取传输器配置,而不是在我们的客户端应用程序中对其进行硬编码。为此,我们可以使用 ClientProxyFactory 类注册一个 自定义提供程序。此类有一个静态 create() 方法,它接受传输器选项对象,并返回自定义的 ClientProxy 实例。

ts
@Module({
  providers: [
    {
      provide: 'MATH_SERVICE',
      useFactory: (configService: ConfigService) => {
        const mathSvcOptions = configService.getMathSvcOptions();
        return ClientProxyFactory.create(mathSvcOptions);
      },
      inject: [ConfigService],
    }
  ]
})
提示

ClientProxyFactory 是从 @nestjs/microservices 包导入的。

另一个选项是使用 @Client() 属性装饰器。

ts
@Client({ transport: Transport.TCP })
client: ClientProxy;
提示

@Client() 装饰器是从 @nestjs/microservices 包导入的。

使用 @Client() 装饰器不是首选技术,因为它更难测试,更难共享客户端实例。

ClientProxy懒惰 的。它不会立即启动连接。相反,它将在第一次微服务调用之前建立,然后在每个后续调用中重用。但是,如果您想延迟应用程序引导过程直到建立连接,则可以使用 OnApplicationBootstrap 生命周期钩子内的 ClientProxy 对象的 connect() 方法手动启动连接。

ts
async onApplicationBootstrap() {
await this.client.connect();
}

如果无法创建连接,connect() 方法将拒绝并返回相应的错误对象。

发送消息

ClientProxy 公开了一个 send() 方法。此方法旨在调用微服务并返回带有其响应的 Observable。因此,我们可以轻松订阅发出的值。

ts
TS
ts
accumulate(): Observable<number> {
  const pattern = { cmd: 'sum' };
  const payload = [1, 2, 3];
  return this.client.send<number>(pattern, payload);
}

send() 方法接受两个参数,patternpayloadpattern 应与 @MessagePattern() 装饰器中定义的参数匹配。payload 是我们想要传输到远程微服务的消息。此方法返回一个 Observable,这意味着您必须在发送消息之前明确订阅它。

发布事件

要发送事件,请使用 ClientProxy 对象的 emit() 方法。此方法将事件发布到消息代理。

ts
TS
ts
async publish() {
  this.client.emit<number>('user_created', new UserCreatedEvent());
}

emit() 方法接受两个参数,patternpayloadpattern 应与 @EventPattern() 装饰器中定义的参数匹配。payload 是我们希望传输到远程微服务的事件负载。此方法返回一个 Observable(不同于 send() 返回的冷 Observable),这意味着无论您是否明确订阅可观察对象,代理都会立即尝试传递事件。

范围

对于来自不同编程语言背景的人来说,可能会意外地发现 Nest 中的几乎所有内容都是在传入请求之间共享的。我们有一个到数据库的连接池、具有全局状态的单例服务等。请记住,Node.js 不遵循请求/响应多线程无状态模型,在该模型中,每个请求都由单独的线程处理。因此,使用单例实例对我们的应用程序来说是完全安全的

但是,在某些特殊情况下,处理程序基于请求的生命周期可能是所需的行为,例如 GraphQL 应用程序中的按请求缓存、请求跟踪或多租户。了解如何控制范围 此处

请求范围的处理程序和提供程序可以使用 @Inject() 装饰器结合 CONTEXT 令牌注入 RequestContext

ts
import { Inject, Injectable, Scope } from '@nestjs/common'
import { CONTEXT, RequestContext } from '@nestjs/microservices'

@Injectable({ scope: Scope.REQUEST })
export class CatsService {
  constructor(@Inject(CONTEXT) private ctx: RequestContext) {}
}

这提供了对 RequestContext 对象的访问,该对象具有两个属性:

ts
export interface RequestContext<T = any> {
  pattern: string | Record<string, any>
  data: T
}

data 属性是消息生产者发送的消息负载。pattern 属性是用于识别适当的处理程序来处理传入消息的模式。

处理超时

在分布式系统中,有时微服务可能会关闭或不可用。为了避免无限长时间的等待,您可以使用超时。超时是与其他服务通信时非常有用的模式。要将超时应用于微服务调用,您可以使用 RxJS timeout 运算符。如果微服务在一定时间内未响应请求,则会引发异常,可以捕获并适当处理该异常。

要解决这个问题,您必须使用 rxjs 包。只需在管道中使用 timeout 运算符:

ts
TS
ts
this.client
  .send<TResult, TInput>(pattern, data)
  .pipe(timeout(5000))
提示

timeout 操作符从 rxjs/operators 包导入。

5 秒后,如果微服务没有响应,则会抛出错误。