gRPC 是一个现代的、开源的、高性能的 RPC 框架,可以在任何环境中运行。它可以高效地连接数据中心内和跨数据中心的服务,并提供可插入式的负载平衡、跟踪、健康检查和身份验证支持。
与许多 RPC 系统一样,gRPC 基于根据可远程调用的函数(方法)定义服务的概念。对于每种方法,您都可以定义参数和返回类型。服务、参数和返回类型使用 Google 的开源语言中立 协议缓冲区 机制在 .proto
文件中定义。
借助 gRPC 传输器,Nest 使用 .proto
文件动态绑定客户端和服务器,以便轻松实现远程过程调用,自动序列化和反序列化结构化数据。
安装
要开始构建基于 gRPC 的微服务,首先安装所需的软件包:
$ npm i --save @grpc/grpc-js @grpc/proto-loader
概述
与其他 Nest 微服务传输层实现一样,您可以使用传递给 createMicroservice()
方法的 options 对象的 transport
属性选择 gRPC 传输器机制。在下面的示例中,我们将设置一个英雄服务。options
属性提供有关该服务的元数据;其属性如下所述。
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
},
})
join()
函数从 path
包导入;Transport
枚举从 @nestjs/microservices
包导入。
在 nest-cli.json
文件中,我们添加了 assets
属性,允许我们分发非 TypeScript 文件,以及 watchAssets
- 以开启对所有非 TypeScript 资源的监视。在我们的例子中,我们希望 .proto
文件自动复制到 dist
文件夹。
{
"compilerOptions": {
"assets": ["**/*.proto"],
"watchAssets": true
}
}
选项
gRPC 传输器选项对象公开了下面描述的属性。
package | Protobuf 包名称(与 .proto 文件中的 package 设置匹配)。必需 |
protoPath | .proto 文件的绝对路径(或相对于根目录的路径)。必需
|
url | 连接 URL。格式为 ip 地址/dns 名称:端口 的字符串(例如,对于 Docker 服务器为 '0.0.0.0:50051' ),定义传输器建立连接的地址/端口。可选。默认为 'localhost:5000' |
protoLoader | 用于加载 .proto 文件的实用程序的 NPM 包名称。可选。默认为 '@grpc/proto-loader' |
loader | @grpc/proto-loader 选项。这些选项提供对 .proto 文件行为的详细控制。可选。有关更多详细信息,请参阅
此处 |
credentials | 服务器凭据。可选。 在此处阅读更多信息 |
示例 gRPC 服务
让我们定义名为HeroesService
的示例 gRPC 服务。在上面的 options
对象中,protoPath
属性设置了 .proto
定义文件 hero.proto
的路径。hero.proto
文件使用 协议缓冲区 构造。它如下所示:
// hero/hero.proto
syntax = "proto3";
package hero;
service HeroesService {
rpc FindOne (HeroById) returns (Hero) {}
}
message HeroById {
int32 id = 1;
}
message Hero {
int32 id = 1;
string name = 2;
}
我们的 HeroesService
公开了一个 FindOne()
方法。此方法需要一个 HeroById
类型的输入参数并返回一个 Hero
消息(协议缓冲区使用 message
元素来定义参数类型和返回类型)。
接下来,我们需要实现服务。要定义满足此定义的处理程序,我们在控制器中使用 @GrpcMethod()
装饰器,如下所示。此装饰器提供将方法声明为 gRPC 服务方法所需的元数据。
前面的微服务章节中介绍的 @MessagePattern()
装饰器(阅读更多)不用于基于 gRPC 的微服务。对于基于 gRPC 的微服务,@GrpcMethod()
装饰器有效地取代了它。
@Controller()
export class HeroesController {
@GrpcMethod('HeroesService', 'FindOne')
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
]
return items.find(({ id }) => id === data.id)
}
}
@GrpcMethod()
装饰器从 @nestjs/microservices
包导入,而 Metadata
和 ServerUnaryCall
则从 grpc
包导入。
上面显示的装饰器有两个参数。第一个是服务名称(例如 'HeroesService''),对应于
hero.proto中的
HeroesService服务定义。第二个(字符串
'FindOne'')对应于 hero.proto
文件中 HeroesService
内定义的 FindOne()
rpc 方法。
findOne()
处理程序方法有三个参数,分别是从调用者传递的 data
、存储 gRPC 请求元数据的 metadata
和 call
,用于获取 GrpcCall
对象属性,例如用于将元数据发送到客户端的 sendMetadata
。
两个 @GrpcMethod()
装饰器参数都是可选的。如果在调用时不带第二个参数(例如 'FindOne'
),Nest 将根据将处理程序名称转换为大写驼峰式命名法,自动将 .proto
文件 rpc 方法与处理程序关联(例如,findOne
处理程序与 FindOne
rpc 调用定义关联)。如下所示。
@Controller()
export class HeroesController {
@GrpcMethod('HeroesService')
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
]
return items.find(({ id }) => id === data.id)
}
}
您还可以省略第一个 @GrpcMethod()
参数。在这种情况下,Nest 会根据定义处理程序的 class 名称自动将处理程序与 proto 定义文件中的服务定义相关联。例如,在下面的代码中,类 HeroesService
根据名称 'HeroesService'
的匹配将其处理程序方法与 hero.proto
文件中的 HeroesService
服务定义相关联。
@Controller()
export class HeroesService {
@GrpcMethod()
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
]
return items.find(({ id }) => id === data.id)
}
}
客户端
Nest 应用程序可以充当 gRPC 客户端,使用 .proto
文件中定义的服务。您可以通过 ClientGrpc
对象访问远程服务。您可以通过多种方式获取 ClientGrpc
对象。
首选技术是导入 ClientsModule
。使用 register()
方法将 .proto
文件中定义的服务包绑定到注入令牌,并配置服务。name
属性是注入令牌。对于 gRPC 服务,使用 transport: Transport.GRPC
。options
属性是一个具有 上面 描述的相同属性的对象。
imports: [
ClientsModule.register([
{
name: 'HERO_PACKAGE',
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
},
},
]),
]
register()
方法接受一个对象数组。通过提供逗号分隔的注册对象列表来注册多个包。
注册后,我们可以使用 @Inject()
注入配置的 ClientGrpc
对象。然后我们使用 ClientGrpc
对象的 getService()
方法来检索服务实例,如下所示。
@Injectable()
export class AppService implements OnModuleInit {
private heroesService: HeroesService
constructor(@Inject('HERO_PACKAGE') private client: ClientGrpc) {}
onModuleInit() {
this.heroesService = this.client.getService<HeroesService>('HeroesService')
}
getHero(): Observable<string> {
return this.heroesService.findOne({ id: 1 })
}
}
除非在 proto 加载器配置中将 keepCase
选项设置为 true
(微服务传输器配置中的 options.loader.keepcase
),否则 gRPC 客户端不会发送名称中包含下划线 _
的字段。
请注意,与其他微服务传输方法中使用的技术相比,存在细微差别。我们使用 ClientGrpc
类而不是 ClientProxy
类,它提供了 getService()
方法。getService()
通用方法将服务名称作为参数并返回其实例(如果可用)。
或者,您可以使用 @Client()
装饰器实例化 ClientGrpc
对象,如下所示:
@Injectable()
export class AppService implements OnModuleInit {
@Client({
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
},
})
client: ClientGrpc
private heroesService: HeroesService
onModuleInit() {
this.heroesService = this.client.getService<HeroesService>('HeroesService')
}
getHero(): Observable<string> {
return this.heroesService.findOne({ id: 1 })
}
}
最后,对于更复杂的场景,我们可以使用 ClientProxyFactory
类注入动态配置的客户端,如 此处 所述。
无论哪种情况,我们最终都会得到对我们的 HeroesService
代理对象的引用,该对象公开了在 .proto
文件中定义的同一组方法。现在,当我们访问此代理对象(即 heroesService
)时,gRPC 系统会自动序列化请求,将它们转发到远程系统,返回响应并反序列化响应。由于 gRPC 为我们屏蔽了这些网络通信细节,因此 heroesService
看起来和行为就像本地提供程序。
注意,所有服务方法都是小写驼峰式(以遵循语言的自然惯例)。因此,例如,虽然我们的 .proto
文件 HeroesService
定义包含 FindOne()
函数,但 heroesService
实例将提供 findOne()
方法。
interface HeroesService {
findOne: (data: { id: number }) => Observable<any>
}
消息处理程序还能够返回Observable
,在这种情况下,结果值将被发出,直到流完成。
@Get()
call(): Observable<any> {
return this.heroesService.findOne({ id: 1 });
}
要发送 gRPC 元数据(连同请求),您可以传递第二个参数,如下所示:
call(): Observable<any> {
const metadata = new Metadata();
metadata.add('Set-Cookie', 'yummy_cookie=choco');
return this.heroesService.findOne({ id: 1 }, metadata);
}
Metadata
类是从 grpc
包导入的。
请注意,这需要更新我们之前定义的 HeroesService
接口。
示例
此处 提供了一个工作示例。
gRPC 反射
gRPC 服务器反射规范 是一项标准,它允许 gRPC 客户端请求有关服务器公开的 API 的详细信息,类似于公开 REST API 的 OpenAPI 文档。这可以使使用开发人员调试工具(如 grpc-ui 或 postman)变得更加容易。
要向您的服务器添加 gRPC 反射支持,请首先安装所需的实现包:
$ npm i --save @grpc/reflection
然后可以使用 gRPC 服务器选项中的 onLoadPackageDefinition
挂钩将其挂接到 gRPC 服务器中,如下所示:
import { ReflectionService } from '@grpc/reflection'
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
options: {
onLoadPackageDefinition: (pkg, server) => {
new ReflectionService(pkg).addToServer(server)
},
},
})
现在,您的服务器将使用反射规范响应请求 API 详细信息的消息。
gRPC 流
gRPC 本身支持长期实时连接,通常称为流
。流对于聊天、观察或块数据传输等情况很有用。在官方文档 此处 中查找更多详细信息。
Nest 以两种可能的方式支持 GRPC 流处理程序:
- RxJS
Subject
+Observable
处理程序:可用于在 Controller 方法内部直接编写响应或传递给Subject
/Observable
消费者 - 纯 GRPC 调用流处理程序:可用于传递给某个执行器,该执行器将处理 Node 标准
Duplex
流处理程序的其余调度。
流示例
让我们定义一个名为 HelloService
的新示例 gRPC 服务。 hello.proto
文件使用 协议缓冲区 构建。它如下所示:
// hello/hello.proto
syntax = "proto3";
package hello;
service HelloService {
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
}
message HelloRequest {
string greeting = 1;
}
message HelloResponse {
string reply = 1;
}
由于返回的流可以发出多个值,因此可以使用 @GrpcMethod
装饰器(如上例所示)简单地实现 LotsOfGreetings
方法。
基于此 .proto
文件,让我们定义 HelloService
接口:
interface HelloService {
bidiHello: (upstream: Observable<HelloRequest>) => Observable<HelloResponse>
lotsOfGreetings: (
upstream: Observable<HelloRequest>,
) => Observable<HelloResponse>
}
interface HelloRequest {
greeting: string
}
interface HelloResponse {
reply: string
}
主题策略
@GrpcStreamMethod()
装饰器将函数参数提供为 RxJS Observable
。这样,我们可以接收和处理多条消息。
@GrpcStreamMethod()
bidiHello(messages: Observable<any>, metadata: Metadata, call: ServerDuplexStream<any, any>): Observable<any> {
const subject = new Subject();
const onNext = message => {
console.log(message);
subject.next({
reply: 'Hello, world!'
});
};
const onComplete = () => subject.complete();
messages.subscribe({
next: onNext,
complete: onComplete,
});
return subject.asObservable();
}
为了支持与 @GrpcStreamMethod()
装饰器的全双工交互,控制器方法必须返回 RxJS Observable
。
Metadata
和 ServerUnaryCall
类/接口从 grpc
包导入。
根据服务定义(在 .proto
文件中),BidiHello
方法应将请求流式传输到服务。要从客户端向流发送多条异步消息,我们利用 RxJS ReplaySubject
类。
const helloService = this.client.getService<HelloService>('HelloService')
const helloRequest$ = new ReplaySubject<HelloRequest>()
helloRequest$.next({ greeting: 'Hello (1)!' })
helloRequest$.next({ greeting: 'Hello (2)!' })
helloRequest$.complete()
return helloService.bidiHello(helloRequest$)
在上面的示例中,我们向流中写入了两条消息(next()
调用),并通知服务我们已完成数据发送(complete()
调用)。
调用流处理程序
当方法返回值定义为 stream
时,@GrpcStreamCall()
装饰器将函数参数提供为 grpc.ServerDuplexStream
,它支持标准方法,如 .on('data',callback)
、.write(message)
或 .cancel()
。有关可用方法的完整文档可在 此处 找到。
或者,当方法返回值不是 stream
时,@GrpcStreamCall()
装饰器提供两个函数参数,分别是 grpc.ServerReadableStream
(阅读更多这里) 和 callback
。
让我们从实现应该支持全双工交互的 BidiHello
开始。
@GrpcStreamCall()
bidiHello(requestStream: any) {
requestStream.on('data', message => {
console.log(message);
requestStream.write({
reply: 'Hello, world!'
});
});
}
此装饰器不需要提供任何特定的返回参数。预计该流的处理方式与任何其他标准流类型类似。
在上面的示例中,我们使用 write()
方法将对象写入响应流。每次我们的服务收到新的数据块时,都会调用作为第二个参数传递给 .on()
方法的回调。
让我们实现 LotsOfGreetings
方法。
@GrpcStreamCall()
lotsOfGreetings(requestStream: any, callback: (err: unknown, value: HelloResponse) => void) {
requestStream.on('data', message => {
console.log(message);
});
requestStream.on('end', () => callback(null, { reply: 'Hello, world!' }));
}
在这里,我们使用 callback
函数在 requestStream
处理完成后发送响应。
gRPC 元数据
元数据是关于特定 RPC 调用的信息,形式为键值对列表,其中键是字符串,值通常是字符串,但可以是二进制数据。元数据对 gRPC 本身是不透明的 - 它允许客户端提供与调用相关的信息给服务器,反之亦然。元数据可能包括身份验证令牌、用于监控目的的请求标识符和标签,以及数据信息,例如数据集中的记录数。
要读取 @GrpcMethod()
处理程序中的元数据,请使用第二个参数(元数据),其类型为 Metadata
(从 grpc
包导入)。
要从处理程序发回元数据,请使用 ServerUnaryCall#sendMetadata()
方法(第三个处理程序参数)。
@Controller()
export class HeroesService {
@GrpcMethod()
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
const serverMetadata = new Metadata()
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
]
serverMetadata.add('Set-Cookie', 'yummy_cookie=choco')
call.sendMetadata(serverMetadata)
return items.find(({ id }) => id === data.id)
}
}
同样,要读取使用 @GrpcStreamMethod()
处理程序(主题策略)注释的处理程序中的元数据,请使用第二个参数(元数据),其类型为 Metadata
(从 grpc
包导入)。
要从处理程序发回元数据,请使用 ServerDuplexStream#sendMetadata()
方法(第三个处理程序参数)。
要从 调用流处理程序(使用 @GrpcStreamCall()
装饰器注释的处理程序)中读取元数据,请在 requestStream
引用上监听 metadata
事件,如下所示:
requestStream.on('metadata', (metadata: Metadata) => {
const meta = metadata.get('X-Meta')
})