订阅的一个常见用例是通知客户端有关特定事件的信息,例如创建新对象、更新字段等(阅读更多信息此处)。
使用 Apollo 驱动程序启用订阅
要启用订阅,请将installSubscriptionHandlers
属性设置为true
。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
installSubscriptionHandlers: true,
})
installSubscriptionHandlers
配置选项已从最新版本的 Apollo 服务器中删除,并且很快也将在此包中弃用。默认情况下,installSubscriptionHandlers
将回退使用 subscriptions-transport-ws
(阅读更多),但我们强烈建议改用 graphql-ws
(阅读更多) 库。
要切换到使用 graphql-ws
包,请使用以下配置:
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': true
},
})
例如,为了向后兼容,您还可以同时使用这两个包(subscriptions-transport-ws
和 graphql-ws
)。
代码优先
要使用代码优先方法创建订阅,我们使用 @Subscription()
装饰器(从 @nestjs/graphql
包导出)和 graphql-subscriptions
包中的 PubSub
类,它提供了一个简单的 发布/订阅 API。
以下订阅处理程序通过调用 PubSub#asyncIterator
来负责订阅事件。此方法采用单个参数 triggerName
,它对应于事件主题名称。
const pubSub = new PubSub()
@Resolver(of => Author)
export class AuthorResolver {
// ...
@Subscription(returns => Comment)
commentAdded() {
return pubSub.asyncIterator('commentAdded')
}
}
所有装饰器都从 @nestjs/graphql
包导出,而 PubSub
类从 graphql-subscriptions
包导出。
PubSub
是一个公开简单 publish
和 subscribe API
的类。有关更多信息,请 此处。请注意,Apollo 文档警告默认实现不适合生产(更多信息,请 此处)。生产应用程序应使用由外部存储支持的 PubSub
实现(阅读更多信息,请点击此处](https://github.com/apollographql/graphql-subscriptions#pubsub-implementations))。
这将导致在 SDL 中生成 GraphQL 模式的以下部分:
type Subscription {
commentAdded(): Comment!
}
请注意,根据定义,订阅返回一个具有单个顶级属性的对象,该属性的键是订阅的名称。此名称要么从订阅处理程序方法的名称(即上面的 commentAdded
)继承,要么通过将带有键 name
的选项作为第二个参数传递给 @Subscription()
装饰器来明确提供,如下所示。
@Subscription(returns => Comment, {
name: 'commentAdded',
})
subscribeToCommentAdded() {
return pubSub.asyncIterator('commentAdded');
}
此构造生成与上一个代码示例相同的 SDL,但允许我们将方法名称与订阅分离。
发布
现在,要发布事件,我们使用 PubSub#publish
方法。这通常在突变中使用,以在对象图的一部分发生变化时触发客户端更新。例如:
@Mutation(returns => Post)
async addComment(
@Args('postId', { type: () => Int }) postId: number,
@Args('comment', { type: () => Comment }) comment: CommentInput,
) {
const newComment = this.commentsService.addComment({ id: postId, comment });
pubSub.publish('commentAdded', { commentAdded: newComment });
return newComment;
}
PubSub#publish
方法将 triggerName
(再次将其视为事件主题名称)作为第一个参数,将事件负载作为第二个参数。如前所述,订阅按定义返回一个值,并且该值具有形状。再次查看我们为 commentAdded
订阅生成的 SDL:
type Subscription {
commentAdded(): Comment!
}
这告诉我们订阅必须返回一个对象,其顶级属性名称为 commentAdded
,该对象具有一个值为 Comment
对象。需要注意的重要一点是,PubSub#publish
方法发出的事件负载的形状必须与预期从订阅返回的值的形状相对应。因此,在上面的示例中,pubSub.publish('commentAdded', {{ '{' }} commentAdded: newComment {{ '}' }})
语句会发布具有适当形状的有效负载的 commentAdded
事件。如果这些形状不匹配,您的订阅将在 GraphQL 验证阶段失败。
过滤订阅
要过滤掉特定事件,请将 filter
属性设置为过滤函数。此函数的作用类似于传递给数组 filter
的函数。它需要两个参数:payload
包含事件有效负载(由事件发布者发送),variables
接受订阅请求期间传入的任何参数。它返回一个布尔值,确定是否应将此事件发布给客户端侦听器。
@Subscription(returns => Comment, {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Args('title') title: string) {
return pubSub.asyncIterator('commentAdded');
}
改变订阅负载
要改变已发布的事件负载,请将 resolve
属性设置为函数。该函数接收事件负载(由事件发布者发送)并返回适当的值。
@Subscription(returns => Comment, {
resolve: value => value,
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
如果您使用 resolve
选项,则应返回未包装的有效负载(例如,在我们的示例中,直接返回 newComment
对象,而不是 {{ '{' }} commentAdded: newComment {{ '}' }}
对象)。
如果您需要访问注入的提供程序(例如,使用外部服务来验证数据),请使用以下构造。
@Subscription(returns => Comment, {
resolve(this: AuthorResolver, value) {
// "this" refers to an instance of "AuthorResolver"
return value;
}
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
相同的构造适用于过滤器:
@Subscription(returns => Comment, {
filter(this: AuthorResolver, payload, variables) {
// "this" refers to an instance of "AuthorResolver"
return payload.commentAdded.title === variables.title;
}
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
Schema first
为了在 Nest 中创建等效订阅,我们将使用 @Subscription()
装饰器。
const pubSub = new PubSub()
@Resolver('Author')
export class AuthorResolver {
// ...
@Subscription()
commentAdded() {
return pubSub.asyncIterator('commentAdded')
}
}
要根据上下文和参数过滤掉特定事件,请设置 filter
属性。
@Subscription('commentAdded', {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
要改变已发布的有效负载,我们可以使用 resolve
函数。
@Subscription('commentAdded', {
resolve: value => value,
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
如果您需要访问注入的提供程序(例如,使用外部服务来验证数据),请使用以下构造:
@Subscription('commentAdded', {
resolve(this: AuthorResolver, value) {
// "this" refers to an instance of "AuthorResolver"
return value;
}
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
相同的构造适用于过滤器:
@Subscription('commentAdded', {
filter(this: AuthorResolver, payload, variables) {
// "this" refers to an instance of "AuthorResolver"
return payload.commentAdded.title === variables.title;
}
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
最后一步是更新类型定义文件。
type Author {
id: Int!
firstName: String
lastName: String
posts: [Post]
}
type Post {
id: Int!
title: String
votes: Int
}
type Query {
author(id: Int!): Author
}
type Comment {
id: String
content: String
}
type Subscription {
commentAdded(title: String!): Comment
}
这样,我们就创建了一个 commentAdded(title: String!): Comment
订阅。您可以在 此处 找到完整的示例实现。
PubSub
我们在上面实例化了一个本地 PubSub
实例。首选方法是将 PubSub
定义为 提供程序 并通过构造函数注入它(使用 @Inject()
装饰器)。这允许我们在整个应用程序中重复使用该实例。例如,按如下方式定义提供程序,然后在需要时注入 'PUB_SUB'
。
{
provide: 'PUB_SUB',
useValue: new PubSub(),
}
自定义订阅服务器
要自定义订阅服务器(例如,更改路径),请使用subscriptions
选项属性。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'subscriptions-transport-ws': {
path: '/graphql'
},
}
})
如果您使用 graphql-ws
包进行订阅,请将 subscriptions-transport-ws
键替换为 graphql-ws
,如下所示:
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': {
path: '/graphql'
},
}
})
通过 WebSockets 进行身份验证
可以在 onConnect
回调函数中检查用户是否经过身份验证,您可以在 subscriptions
选项中指定该回调函数。
onConnect
将接收传递给 SubscriptionClient
的 connectionParams
作为第一个参数(阅读更多内容](https://www.apollographql.com/docs/react/data/subscriptions/#5-authenticate-over-websocket-optional))。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'subscriptions-transport-ws': {
onConnect: (connectionParams) => {
const authToken = connectionParams.authToken
if (!isValid(authToken)) {
throw new Error('Token is not valid')
}
// 从 token 中提取用户信息
const user = parseToken(authToken)
// 返回用户信息以便稍后将其添加到上下文中
return { user }
},
}
},
context: ({ connection }) => {
// connection.context 将等于`onConnect`回调返回的内容
},
})
本例中的 authToken
仅在首次建立连接时由客户端发送一次。
使用此连接进行的所有订阅都将具有相同的 authToken
,因此具有相同的用户信息。
subscriptions-transport-ws
中有一个错误,允许连接跳过 onConnect
阶段(阅读更多内容](https://github.com/apollographql/subscriptions-transport-ws/issues/349))。您不应假设在用户启动订阅时调用了 onConnect
,并且始终检查 context
是否已填充。
如果您使用的是 graphql-ws
包,则 onConnect
回调的签名会略有不同:
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': {
onConnect: (context: Context<any>) => {
const { connectionParams, extra } = context
// 用户验证将与上例相同
// 使用 graphql-ws 时,附加上下文值应存储在附加字段中
extra.user = { user: {} }
},
},
},
context: ({ extra }) => {
// 现在您可以通过附加字段访问附加上下文值
},
})
使用 Mercurius 驱动程序启用订阅
要启用订阅,请将subscription
属性设置为true
。
GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: true,
})
您还可以传递选项对象来设置自定义发射器、验证传入连接等。阅读更多此处(参见subscription
)。
代码优先
要使用代码优先方法创建订阅,我们使用@Subscription()
装饰器(从@nestjs/graphql
包导出)和mercurius
包中的PubSub
类,它提供了一个简单的发布/订阅API。
以下订阅处理程序通过调用PubSub#asyncIterator
来负责订阅事件。此方法采用单个参数,即triggerName
,它对应于事件主题名称。
@Resolver(of => Author)
export class AuthorResolver {
// ...
@Subscription(returns => Comment)
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded')
}
}
上面示例中使用的所有装饰器都从 @nestjs/graphql
包中导出,而 PubSub
类则从 mercurius
包中导出。
PubSub
是一个公开简单 publish
和 subscribe
API 的类。查看 此部分 了解如何注册自定义 PubSub
类。
这将导致在 SDL 中生成 GraphQL 架构的以下部分:
type Subscription {
commentAdded(): Comment!
}
请注意,根据定义,订阅返回一个具有单个顶级属性的对象,该属性的键是订阅的名称。此名称要么从订阅处理程序方法的名称(即上面的 commentAdded
)继承,要么通过将带有键 name
的选项作为第二个参数传递给 @Subscription()
装饰器来明确提供,如下所示。
@Subscription(returns => Comment, {
name: 'commentAdded',
})
subscribeToCommentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
此构造产生与上一个代码示例相同的 SDL,但允许我们将方法名称与订阅分离。
发布
现在,要发布事件,我们使用 PubSub#publish
方法。这通常在突变中使用,以在对象图的一部分发生更改时触发客户端更新。例如:
@Mutation(returns => Post)
async addComment(
@Args('postId', { type: () => Int }) postId: number,
@Args('comment', { type: () => Comment }) comment: CommentInput,
@Context('pubsub') pubSub: PubSub,
) {
const newComment = this.commentsService.addComment({ id: postId, comment });
await pubSub.publish({
topic: 'commentAdded',
payload: {
commentAdded: newComment
}
});
return newComment;
}
如上所述,订阅按定义返回一个值,并且该值具有形状。再次查看我们为 commentAdded
订阅生成的 SDL:
type Subscription {
commentAdded(): Comment!
}
这告诉我们订阅必须返回一个对象,该对象的顶级属性名称为 commentAdded
,该对象的值是 Comment
对象。需要注意的重要一点是,PubSub#publish
方法发出的事件负载的形状必须与预期从订阅返回的值的形状相对应。因此,在上面的示例中,pubSub.publish({{ '{' }} topic: 'commentAdded', payload: {{ '{' }} commentAdded: newComment {{ '}' }} {{ '}' }})
语句发布具有适当形状负载的 commentAdded
事件。如果这些形状不匹配,您的订阅将在 GraphQL 验证阶段失败。
过滤订阅
要过滤掉特定事件,请将 filter
属性设置为过滤器函数。此函数的作用类似于传递给数组 filter
的函数。它接受两个参数:payload
包含事件有效负载(由事件发布者发送),variables
接受订阅请求期间传入的任何参数。它返回一个布尔值,确定是否应将此事件发布给客户端侦听器。
@Subscription(returns => Comment, {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Args('title') title: string, @Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
如果您需要访问注入的提供程序(例如,使用外部服务来验证数据),请使用以下构造。
@Subscription(returns => Comment, {
filter(this: AuthorResolver, payload, variables) {
// "this" refers to an instance of "AuthorResolver"
return payload.commentAdded.title === variables.title;
}
})
commentAdded(@Args('title') title: string, @Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
架构优先
为了在 Nest 中创建等效订阅,我们将使用 @Subscription()
装饰器。
const pubSub = new PubSub()
@Resolver('Author')
export class AuthorResolver {
// ...
@Subscription()
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded')
}
}
要根据上下文和参数过滤掉特定事件,请设置 filter
属性。
@Subscription('commentAdded', {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
如果您需要访问注入的提供程序(例如,使用外部服务来验证数据),请使用以下构造:
@Subscription('commentAdded', {
filter(this: AuthorResolver, payload, variables) {
// "this" refers to an instance of "AuthorResolver"
return payload.commentAdded.title === variables.title;
}
})
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
最后一步是更新类型定义文件。
type Author {
id: Int!
firstName: String
lastName: String
posts: [Post]
}
type Post {
id: Int!
title: String
votes: Int
}
type Query {
author(id: Int!): Author
}
type Comment {
id: String
content: String
}
type Subscription {
commentAdded(title: String!): Comment
}
这样,我们就创建了一个 commentAdded(title: String!): Comment
订阅。
PubSub
在上面的示例中,我们使用了默认的 PubSub
发射器 (mqemitter)
首选方法(用于生产)是使用 mqemitter-redis
。或者,可以提供自定义的 PubSub
实现阅读更多信息 此处
GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: {
emitter: require('mqemitter-redis')({
port: 6579,
host: '127.0.0.1',
}),
},
})
通过 WebSockets 进行身份验证
可以在 verifyClient
回调函数中检查用户是否经过身份验证,您可以在 subscription
选项中指定该回调函数。
verifyClient
将接收 info
对象作为第一个参数,您可以使用该参数检索请求的标头。
GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: {
verifyClient: (info, next) => {
const authorization = info.req.headers?.authorization as string
if (!authorization?.startsWith('Bearer ')) {
return next(false)
}
next(true)
},
}
})