gRPC gRPC

导读

gRPC 是一个现代的、开源的、高性能的 RPC 框架,可以在任何环境中运行。它可以高效地连接数据中心内和跨数据中心的服务,并提供可插入式的负载平衡、跟踪、健康检查和身份验证支持。

与许多 RPC 系统一样,gRPC 基于根据可远程调用的函数(方法)定义服务的概念。对于每种方法,您都可以定义参数和返回类型。服务、参数和返回类型使用 Google 的开源语言中立 协议缓冲区 机制在 .proto 文件中定义。

借助 gRPC 传输器,Nest 使用 .proto 文件动态绑定客户端和服务器,以便轻松实现远程过程调用,自动序列化和反序列化结构化数据。

安装

要开始构建基于 gRPC 的微服务,首先安装所需的软件包:

bash
$ npm i --save @grpc/grpc-js @grpc/proto-loader

概述

与其他 Nest 微服务传输层实现一样,您可以使用传递给 createMicroservice() 方法的 options 对象的 transport 属性选择 gRPC 传输器机制。在下面的示例中,我们将设置一个英雄服务。options 属性提供有关该服务的元数据;其属性如下所述。

ts
main
ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.GRPC,
  options: {
    package: 'hero',
    protoPath: join(__dirname, 'hero/hero.proto'),
  },
})
提示

join() 函数从 path 包导入;Transport 枚举从 @nestjs/microservices 包导入。

nest-cli.json 文件中,我们添加了 assets 属性,允许我们分发非 TypeScript 文件,以及 watchAssets - 以开启对所有非 TypeScript 资源的监视。在我们的例子中,我们希望 .proto 文件自动复制到 dist 文件夹。

json
{
  "compilerOptions": {
    "assets": ["**/*.proto"],
    "watchAssets": true
  }
}

选项

gRPC 传输器选项对象公开了下面描述的属性。

packageProtobuf 包名称(与 .proto 文件中的 package 设置匹配)。必需
protoPath.proto 文件的绝对路径(或相对于根目录的路径)。必需
url连接 URL。格式为 ip 地址/dns 名称:端口 的字符串(例如,对于 Docker 服务器为 '0.0.0.0:50051'),定义传输器建立连接的地址/端口。可选。默认为 'localhost:5000'
protoLoader用于加载 .proto 文件的实用程序的 NPM 包名称。可选。默认为 '@grpc/proto-loader'
loader@grpc/proto-loader 选项。这些选项提供对 .proto 文件行为的详细控制。可选。有关更多详细信息,请参阅 此处
credentials 服务器凭据。可选。 在此处阅读更多信息

示例 gRPC 服务

让我们定义名为HeroesService的示例 gRPC 服务。在上面的 options 对象中,protoPath 属性设置了 .proto 定义文件 hero.proto 的路径。hero.proto 文件使用 协议缓冲区 构造。它如下所示:

ts
// hero/hero.proto
syntax = "proto3";

package hero;

service HeroesService {
  rpc FindOne (HeroById) returns (Hero) {}
}

message HeroById {
  int32 id = 1;
}

message Hero {
  int32 id = 1;
  string name = 2;
}

我们的 HeroesService 公开了一个 FindOne() 方法。此方法需要一个 HeroById 类型的输入参数并返回一个 Hero 消息(协议缓冲区使用 message 元素来定义参数类型和返回类型)。

接下来,我们需要实现服务。要定义满足此定义的处理程序,我们在控制器中使用 @GrpcMethod() 装饰器,如下所示。此装饰器提供将方法声明为 gRPC 服务方法所需的元数据。

提示

前面的微服务章节中介绍的 @MessagePattern() 装饰器(阅读更多)不用于基于 gRPC 的微服务。对于基于 gRPC 的微服务,@GrpcMethod() 装饰器有效地取代了它。

ts
heroes.controller
ts
@Controller()
export class HeroesController {
  @GrpcMethod('HeroesService', 'FindOne')
  findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ]
    return items.find(({ id }) => id === data.id)
  }
}
提示

@GrpcMethod() 装饰器从 @nestjs/microservices 包导入,而 MetadataServerUnaryCall 则从 grpc 包导入。

上面显示的装饰器有两个参数。第一个是服务名称(例如 'HeroesService''),对应于 hero.proto中的HeroesService服务定义。第二个(字符串'FindOne'')对应于 hero.proto 文件中 HeroesService 内定义的 FindOne() rpc 方法。

findOne() 处理程序方法有三个参数,分别是从调用者传递的 data、存储 gRPC 请求元数据的 metadatacall,用于获取 GrpcCall 对象属性,例如用于将元数据发送到客户端的 sendMetadata

两个 @GrpcMethod() 装饰器参数都是可选的。如果在调用时不带第二个参数(例如 'FindOne'),Nest 将根据将处理程序名称转换为大写驼峰式命名法,自动将 .proto 文件 rpc 方法与处理程序关联(例如,findOne 处理程序与 FindOne rpc 调用定义关联)。如下所示。

ts
heroes.controller
ts
@Controller()
export class HeroesController {
  @GrpcMethod('HeroesService')
  findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ]
    return items.find(({ id }) => id === data.id)
  }
}

您还可以省略第一个 @GrpcMethod() 参数。在这种情况下,Nest 会根据定义处理程序的 class 名称自动将处理程序与 proto 定义文件中的服务定义相关联。例如,在下面的代码中,类 HeroesService 根据名称 'HeroesService' 的匹配将其处理程序方法与 hero.proto 文件中的 HeroesService 服务定义相关联。

ts
heroes.controller
ts
@Controller()
export class HeroesService {
  @GrpcMethod()
  findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ]
    return items.find(({ id }) => id === data.id)
  }
}

客户端

Nest 应用程序可以充当 gRPC 客户端,使用 .proto 文件中定义的服务。您可以通过 ClientGrpc 对象访问远程服务。您可以通过多种方式获取 ClientGrpc 对象。

首选技术是导入 ClientsModule。使用 register() 方法将 .proto 文件中定义的服务包绑定到注入令牌,并配置服务。name 属性是注入令牌。对于 gRPC 服务,使用 transport: Transport.GRPCoptions 属性是一个具有 上面 描述的相同属性的对象。

ts
imports: [
  ClientsModule.register([
    {
      name: 'HERO_PACKAGE',
      transport: Transport.GRPC,
      options: {
        package: 'hero',
        protoPath: join(__dirname, 'hero/hero.proto'),
      },
    },
  ]),
]
提示

register() 方法接受一个对象数组。通过提供逗号分隔的注册对象列表来注册多个包。

注册后,我们可以使用 @Inject() 注入配置的 ClientGrpc 对象。然后我们使用 ClientGrpc 对象的 getService() 方法来检索服务实例,如下所示。

ts
@Injectable()
export class AppService implements OnModuleInit {
  private heroesService: HeroesService

  constructor(@Inject('HERO_PACKAGE') private client: ClientGrpc) {}

  onModuleInit() {
    this.heroesService = this.client.getService<HeroesService>('HeroesService')
  }

  getHero(): Observable<string> {
    return this.heroesService.findOne({ id: 1 })
  }
}
警告

除非在 proto 加载器配置中将 keepCase 选项设置为 true(微服务传输器配置中的 options.loader.keepcase),否则 gRPC 客户端不会发送名称中包含下划线 _ 的字段。

请注意,与其他微服务传输方法中使用的技术相比,存在细微差别。我们使用 ClientGrpc 类而不是 ClientProxy 类,它提供了 getService() 方法。getService() 通用方法将服务名称作为参数并返回其实例(如果可用)。

或者,您可以使用 @Client() 装饰器实例化 ClientGrpc 对象,如下所示:

ts
@Injectable()
export class AppService implements OnModuleInit {
  @Client({
    transport: Transport.GRPC,
    options: {
      package: 'hero',
      protoPath: join(__dirname, 'hero/hero.proto'),
    },
  })
  client: ClientGrpc

  private heroesService: HeroesService

  onModuleInit() {
    this.heroesService = this.client.getService<HeroesService>('HeroesService')
  }

  getHero(): Observable<string> {
    return this.heroesService.findOne({ id: 1 })
  }
}

最后,对于更复杂的场景,我们可以使用 ClientProxyFactory 类注入动态配置的客户端,如 此处 所述。

无论哪种情况,我们最终都会得到对我们的 HeroesService 代理对象的引用,该对象公开了在 .proto 文件中定义的同一组方法。现在,当我们访问此代理对象(即 heroesService)时,gRPC 系统会自动序列化请求,将它们转发到远程系统,返回响应并反序列化响应。由于 gRPC 为我们屏蔽了这些网络通信细节,因此 heroesService 看起来和行为就像本地提供程序。

注意,所有服务方法都是小写驼峰式(以遵循语言的自然惯例)。因此,例如,虽然我们的 .proto 文件 HeroesService 定义包含 FindOne() 函数,但 heroesService 实例将提供 findOne() 方法。

ts
interface HeroesService {
  findOne: (data: { id: number }) => Observable<any>
}

消息处理程序还能够返回Observable,在这种情况下,结果值将被发出,直到流完成。

ts
heroes.controller
ts
@Get()
call(): Observable<any> {
  return this.heroesService.findOne({ id: 1 });
}

要发送 gRPC 元数据(连同请求),您可以传递第二个参数,如下所示:

ts
call(): Observable<any> {
  const metadata = new Metadata();
  metadata.add('Set-Cookie', 'yummy_cookie=choco');

  return this.heroesService.findOne({ id: 1 }, metadata);
}
提示

Metadata 类是从 grpc 包导入的。

请注意,这需要更新我们之前定义的 HeroesService 接口。

示例

此处 提供了一个工作示例。

gRPC 反射

gRPC 服务器反射规范 是一项标准,它允许 gRPC 客户端请求有关服务器公开的 API 的详细信息,类似于公开 REST API 的 OpenAPI 文档。这可以使使用开发人员调试工具(如 grpc-ui 或 postman)变得更加容易。

要向您的服务器添加 gRPC 反射支持,请首先安装所需的实现包:

bash
$ npm i --save @grpc/reflection

然后可以使用 gRPC 服务器选项中的 onLoadPackageDefinition 挂钩将其挂接到 gRPC 服务器中,如下所示:

main
ts
import { ReflectionService } from '@grpc/reflection'

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  options: {
    onLoadPackageDefinition: (pkg, server) => {
      new ReflectionService(pkg).addToServer(server)
    },
  },
})

现在,您的服务器将使用反射规范响应请求 API 详细信息的消息。

gRPC 流

gRPC 本身支持长期实时连接,通常称为。流对于聊天、观察或块数据传输等情况很有用。在官方文档 此处 中查找更多详细信息。

Nest 以两种可能的方式支持 GRPC 流处理程序:

  • RxJS Subject + Observable 处理程序:可用于在 Controller 方法内部直接编写响应或传递给 Subject/Observable 消费者
  • 纯 GRPC 调用流处理程序:可用于传递给某个执行器,该执行器将处理 Node 标准 Duplex 流处理程序的其余调度。

流示例

让我们定义一个名为 HelloService 的新示例 gRPC 服务。 hello.proto 文件使用 协议缓冲区 构建。它如下所示:

ts
// hello/hello.proto
syntax = "proto3";

package hello;

service HelloService {
  rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
  rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
}

message HelloRequest {
  string greeting = 1;
}

message HelloResponse {
  string reply = 1;
}
提示

由于返回的流可以发出多个值,因此可以使用 @GrpcMethod 装饰器(如上例所示)简单地实现 LotsOfGreetings 方法。

基于此 .proto 文件,让我们定义 HelloService 接口:

ts
interface HelloService {
  bidiHello: (upstream: Observable<HelloRequest>) => Observable<HelloResponse>
  lotsOfGreetings: (
    upstream: Observable<HelloRequest>,
  ) => Observable<HelloResponse>
}

interface HelloRequest {
  greeting: string
}

interface HelloResponse {
  reply: string
}
提示

proto 接口可以由 ts-proto 包自动生成,详细了解 这里

主题策略

@GrpcStreamMethod() 装饰器将函数参数提供为 RxJS Observable。这样,我们可以接收和处理多条消息。

ts
@GrpcStreamMethod()
bidiHello(messages: Observable<any>, metadata: Metadata, call: ServerDuplexStream<any, any>): Observable<any> {
  const subject = new Subject();

  const onNext = message => {
    console.log(message);
    subject.next({
      reply: 'Hello, world!'
    });
  };
  const onComplete = () => subject.complete();
  messages.subscribe({
    next: onNext,
    complete: onComplete,
  });

  return subject.asObservable();
}
警告

为了支持与 @GrpcStreamMethod() 装饰器的全双工交互,控制器方法必须返回 RxJS Observable

提示

MetadataServerUnaryCall 类/接口从 grpc 包导入。

根据服务定义(在 .proto 文件中),BidiHello 方法应将请求流式传输到服务。要从客户端向流发送多条异步消息,我们利用 RxJS ReplaySubject 类。

ts
const helloService = this.client.getService<HelloService>('HelloService')
const helloRequest$ = new ReplaySubject<HelloRequest>()

helloRequest$.next({ greeting: 'Hello (1)!' })
helloRequest$.next({ greeting: 'Hello (2)!' })
helloRequest$.complete()

return helloService.bidiHello(helloRequest$)

在上面的示例中,我们向流中写入了两条消息(next() 调用),并通知服务我们已完成数据发送(complete() 调用)。

调用流处理程序

当方法返回值定义为 stream 时,@GrpcStreamCall() 装饰器将函数参数提供为 grpc.ServerDuplexStream,它支持标准方法,如 .on('data',callback).write(message).cancel()。有关可用方法的完整文档可在 此处 找到。

或者,当方法返回值不是 stream 时,@GrpcStreamCall() 装饰器提供两个函数参数,分别是 grpc.ServerReadableStream (阅读更多这里) 和 callback

让我们从实现应该支持全双工交互的 BidiHello 开始。

ts
@GrpcStreamCall()
bidiHello(requestStream: any) {
  requestStream.on('data', message => {
    console.log(message);
    requestStream.write({
      reply: 'Hello, world!'
    });
  });
}
提示

此装饰器不需要提供任何特定的返回参数。预计该流的处理方式与任何其他标准流类型类似。

在上面的示例中,我们使用 write() 方法将对象写入响应流。每次我们的服务收到新的数据块时,都会调用作为第二个参数传递给 .on() 方法的回调。

让我们实现 LotsOfGreetings 方法。

ts
@GrpcStreamCall()
lotsOfGreetings(requestStream: any, callback: (err: unknown, value: HelloResponse) => void) {
  requestStream.on('data', message => {
    console.log(message);
  });
  requestStream.on('end', () => callback(null, { reply: 'Hello, world!' }));
}

在这里,我们使用 callback 函数在 requestStream 处理完成后发送响应。

gRPC 元数据

元数据是关于特定 RPC 调用的信息,形式为键值对列表,其中键是字符串,值通常是字符串,但可以是二进制数据。元数据对 gRPC 本身是不透明的 - 它允许客户端提供与调用相关的信息给服务器,反之亦然。元数据可能包括身份验证令牌、用于监控目的的请求标识符和标签,以及数据信息,例如数据集中的记录数。

要读取 @GrpcMethod() 处理程序中的元数据,请使用第二个参数(元数据),其类型为 Metadata(从 grpc 包导入)。

要从处理程序发回元数据,请使用 ServerUnaryCall#sendMetadata() 方法(第三个处理程序参数)。

ts
heroes.controller
ts
@Controller()
export class HeroesService {
  @GrpcMethod()
  findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
    const serverMetadata = new Metadata()
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ]

    serverMetadata.add('Set-Cookie', 'yummy_cookie=choco')
    call.sendMetadata(serverMetadata)

    return items.find(({ id }) => id === data.id)
  }
}

同样,要读取使用 @GrpcStreamMethod() 处理程序(主题策略)注释的处理程序中的元数据,请使用第二个参数(元数据),其类型为 Metadata(从 grpc 包导入)。

要从处理程序发回元数据,请使用 ServerDuplexStream#sendMetadata() 方法(第三个处理程序参数)。

要从 调用流处理程序(使用 @GrpcStreamCall() 装饰器注释的处理程序)中读取元数据,请在 requestStream 引用上监听 metadata 事件,如下所示:

ts
requestStream.on('metadata', (metadata: Metadata) => {
  const meta = metadata.get('X-Meta')
})