在Nest.js上使用socket.io和Redis Adapter的正确方法是什么



我正在尝试创建一个示例Nest.js应用程序,使用socket.io进行websocket连接,该应用程序使用Redis Adapter将事件发布到多个微服务。

在Redis v4中,当创建客户端时,它似乎不再自动连接。

我有以下适配器:

import { IoAdapter } from '@nestjs/platform-socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import { ServerOptions } from 'socket.io';
const pubClient = createClient({
url: 'redis://redis:6379',
password: 'pass',
});
const subClient = pubClient.duplicate();
const redisAdapter = createAdapter(pubClient, subClient);
export class RedisIoAdapter extends IoAdapter {
override createIOServer(port: number, options?: ServerOptions): any {
const server = super.createIOServer(port, options);
server.adapter(redisAdapter);
return server;
}
}

当调用this.server.to('room').emit()时,我收到以下错误:

server  | /app/node_modules/@node-redis/client/dist/lib/client/index.js:407
server  |         return Promise.reject(new errors_1.ClientClosedError());
server  |                               ^
server  | 
server  | ClientClosedError: The client is closed
server  |     at Commander._RedisClient_sendCommand (/app/node_modules/@node-redis/client/dist/lib/client/index.js:407:31)
server  |     at Commander.commandsExecutor (/app/node_modules/@node-redis/client/dist/lib/client/index.js:166:154)
server  |     at Commander.BaseClass.<computed> [as publish] (/app/node_modules/@node-redis/client/dist/lib/commander.js:8:29)
server  |     at RedisAdapter.broadcast (/app/node_modules/@socket.io/redis-adapter/dist/index.js:406:28)
server  |     at BroadcastOperator.emit (/app/node_modules/socket.io/dist/broadcast-operator.js:109:22)
server  |     at AppGateway.handleMessage (/app/dist/app.gateway.js:21:30)
server  |     at /app/node_modules/@nestjs/websockets/context/ws-context-creator.js:43:33
server  |     at processTicksAndRejections (node:internal/process/task_queues:96:5)
server  |     at async AppGateway.<anonymous> (/app/node_modules/@nestjs/websockets/context/ws-proxy.js:11:32)
server  |     at async WebSocketsController.pickResult (/app/node_modules/@nestjs/websockets/web-sockets-controller.js:91:24)
server  | 
server  | Node.js v17.4.0
server exited with code 1

我尝试降级到"redis": "^3.1.2""socket.io-redis": "^6.0.0",并更新了相关代码(例如,使用RedisClient而不是createClient(,一切似乎都很好(我在使用三台服务器的k8s中尝试过,所有客户端都收到了消息(。

不过,我想使用最新的版本。考虑到RedisClient.connect是一个异步函数,在这种情况下,连接Redis的正确方式是什么?createIOServer也不是一个异步函数,所以我也不能在其中调用connect


仅供参考,这是我的main.ts文件:

async function bootstrap() {
const app = await NestFactory.create<NestExpressApplication>(AppModule);
app.useWebSocketAdapter(new RedisIoAdapter(app));
await app.listen(3000);
}
bootstrap();

app.gateway.ts:

import { SubscribeMessage, WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
type Payload = {
name: String;
text: String;
};
@WebSocketGateway({
cors: {
origin: '*',
},
})
export class AppGateway {
@WebSocketServer() server: Server;
@SubscribeMessage('msgToServer')
handleMessage(client: Socket, payload: Payload) {
this.server.to('msgRoom').emit('msgToClient', payload);
}
handleConnection(client: Socket, ...args: any[]) {
client.join('msgRoom');
}
}

您应该使用redis3npm i --save redis@3。只有微服务文档告诉我们这一点,但对整个Nest包都有效。

import { IoAdapter } from '@nestjs/platform-socket.io';
import { Server, ServerOptions } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import { INestApplication } from '@nestjs/common';
import { ConfigurationService } from 'src/configuration/configuration.service';
export class RedisIoAdapter extends IoAdapter {
protected redisAdapter;
constructor(app: INestApplication) {
super(app);
const configService = app.get(ConfigurationService);
const pubClient = createClient({
host: configService.get('REDIS_HOST'),
port: configService.get('REDIS_PORT'),
});
const subClient = pubClient.duplicate();
this.redisAdapter = createAdapter(pubClient, subClient);
}
createIOServer(port: number, options?: ServerOptions) {
const server = super.createIOServer(port, options) as Server;
server.adapter(this.redisAdapter);
return server;
}
}

它与redis@4(https://socket.io/docs/v4/redis-adapter/#usage)

import { IoAdapter } from '@nestjs/platform-socket.io';
import { Server, ServerOptions } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import { INestApplication } from '@nestjs/common';
import { ConfigurationService } from 'src/configuration/configuration.service';
export class RedisIoAdapter extends IoAdapter {
protected redisAdapter;
constructor(app: INestApplication) {
super(app);
const configService = app.get(ConfigurationService);
const pubClient = createClient({
socket: {
host: configService.get('REDIS_HOST'),
port: configService.get('REDIS_PORT'),
},
});
const subClient = pubClient.duplicate();
pubClient.connect();  // <------
subClient.connect();  // <------
this.redisAdapter = createAdapter(pubClient, subClient);
}
createIOServer(port: number, options?: ServerOptions) {
const server = super.createIOServer(port, options) as Server;
server.adapter(this.redisAdapter);
return server;
}
}

我在redis-io.adapter.ts中添加了一个连接到客户端的函数:

export async function connectRedis() {
if (pubClient.isOpen && subClient.isOpen) return;
await Promise.all([pubClient.connect(), subClient.connect()]);
}

我在main.ts叫它,就在useWebSocketAdapter之前

async function bootstrap() {
const app = await NestFactory.create<NestExpressApplication>(AppModule);
await connectRedis();
app.useWebSocketAdapter(new RedisIoAdapter(app));
await app.listen(3000);
}
bootstrap();

我还将createAdapter调用移到了createIOServer中,这样,无论何时创建适配器,Redis客户端都将始终处于连接状态。现在似乎工作得很好,就像旧版本一样。

最新更新