The WebSockets module is platform-agnostic, hence, you can bring your own library (or even a native implementation) by making use of WebSocketAdapter
interface. This interface forces to implement few methods described in the following table:
create | Creates a socket instance based on passed arguments |
bindClientConnect | Binds the client connection event |
bindClientDisconnect | Binds the client disconnection event (optional*) |
bindMessageHandlers | Binds the incoming message to the corresponding message handler |
close | Terminates a server instance |
扩展 socket.io
socket.io 包被包装在 IoAdapter
类中。如果您想增强适配器的基本功能,该怎么办?例如,您的技术要求需要能够在 Web 服务的多个负载平衡实例之间广播事件。为此,您可以扩展 IoAdapter
并覆盖一个方法,该方法负责实例化新的 socket.io 服务器。但首先,让我们安装所需的包。
要将 socket.io 与多个负载平衡实例一起使用,您必须通过在客户端 socket.io 配置中设置 transports: ['websocket']
来禁用轮询,或者您必须在负载平衡器中启用基于 cookie 的路由。仅靠 Redis 是不够的。有关更多信息,请参阅此处。
$ npm i --save redis socket.io @socket.io/redis-adapter
一旦安装了包,我们就可以创建一个RedisIoAdapter
类。
import { IoAdapter } from '@nestjs/platform-socket.io'
import { ServerOptions } from 'socket.io'
import { createAdapter } from '@socket.io/redis-adapter'
import { createClient } from 'redis'
export class RedisIoAdapter extends IoAdapter {
private adapterConstructor: ReturnType<typeof createAdapter>
async connectToRedis(): Promise<void> {
const pubClient = createClient({ url: `redis://localhost:6379` })
const subClient = pubClient.duplicate()
await Promise.all([pubClient.connect(), subClient.connect()])
this.adapterConstructor = createAdapter(pubClient, subClient)
}
createIOServer(port: number, options?: ServerOptions): any {
const server = super.createIOServer(port, options)
server.adapter(this.adapterConstructor)
return server
}
}
之后,只需切换到新创建的 Redis 适配器。
const app = await NestFactory.create(AppModule)
const redisIoAdapter = new RedisIoAdapter(app)
await redisIoAdapter.connectToRedis()
app.useWebSocketAdapter(redisIoAdapter)
Ws 库
另一个可用的适配器是 WsAdapter
,它充当框架之间的代理,并集成速度极快且经过全面测试的 ws 库。此适配器与本机浏览器 WebSockets 完全兼容,并且比 socket.io 包快得多。不幸的是,它的开箱即用功能明显较少。在某些情况下,您可能并不一定需要它们。
ws
库不支持命名空间(socket.io
推广的通信通道)。但是,为了以某种方式模仿此功能,您可以在不同的路径上安装多个 ws
服务器(例如:@WebSocketGateway({{ '{' }} path: '/users' {{ '}' }})
)。
为了使用 ws
,我们首先必须安装所需的包:
$ npm i --save @nestjs/platform-ws
安装包后,我们可以切换适配器:
const app = await NestFactory.create(AppModule)
app.useWebSocketAdapter(new WsAdapter(app))
The WsAdapter
is imported from @nestjs/platform-ws
.
高级(自定义适配器)
为了演示目的,我们将手动集成 ws 库。如前所述,此库的适配器已创建,并从 @nestjs/platform-ws
包中作为 WsAdapter
类公开。简化的实现可能如下所示:
import * as WebSocket from 'ws'
import { INestApplicationContext, WebSocketAdapter } from '@nestjs/common'
import { MessageMappingProperties } from '@nestjs/websockets'
import { EMPTY, Observable, fromEvent } from 'rxjs'
import { filter, mergeMap } from 'rxjs/operators'
export class WsAdapter implements WebSocketAdapter {
constructor(private app: INestApplicationContext) {}
create(port: number, options: any = {}): any {
return new WebSocket.Server({ port, ...options })
}
bindClientConnect(server, callback: Function) {
server.on('connection', callback)
}
bindMessageHandlers(
client: WebSocket,
handlers: MessageMappingProperties[],
process: (data: any) => Observable<any>,
) {
fromEvent(client, 'message')
.pipe(
mergeMap(data => this.bindMessageHandler(data, handlers, process)),
filter(result => result),
)
.subscribe(response => client.send(JSON.stringify(response)))
}
bindMessageHandler(
buffer,
handlers: MessageMappingProperties[],
process: (data: any) => Observable<any>,
): Observable<any> {
const message = JSON.parse(buffer.data)
const messageHandler = handlers.find(
handler => handler.message === message.event,
)
if (!messageHandler) {
return EMPTY
}
return process(messageHandler.callback(message.data))
}
close(server) {
server.close()
}
}
当你想利用 ws 库时,请使用内置的 WsAdapter
,而不是创建自己的。
然后,我们可以使用 useWebSocketAdapter()
方法设置自定义适配器:
const app = await NestFactory.create(AppModule)
app.useWebSocketAdapter(new WsAdapter(app))
示例
此处 提供了一个使用WsAdapter
的工作示例。