我正在尝试创建一个示例Nest.js应用程序,使用socket.io进行websocket连接,该应用程序使用Redis Adapter将事件发布到多个微服务。
在Redis v4中,当创建客户端时,它似乎不再自动连接。
我有以下适配器:
import { IoAdapter } from '@nestjs/platform-socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import { ServerOptions } from 'socket.io';
const pubClient = createClient({
url: 'redis://redis:6379',
password: 'pass',
});
const subClient = pubClient.duplicate();
const redisAdapter = createAdapter(pubClient, subClient);
export class RedisIoAdapter extends IoAdapter {
override createIOServer(port: number, options?: ServerOptions): any {
const server = super.createIOServer(port, options);
server.adapter(redisAdapter);
return server;
}
}
当调用this.server.to('room').emit()
时,我收到以下错误:
server | /app/node_modules/@node-redis/client/dist/lib/client/index.js:407
server | return Promise.reject(new errors_1.ClientClosedError());
server | ^
server |
server | ClientClosedError: The client is closed
server | at Commander._RedisClient_sendCommand (/app/node_modules/@node-redis/client/dist/lib/client/index.js:407:31)
server | at Commander.commandsExecutor (/app/node_modules/@node-redis/client/dist/lib/client/index.js:166:154)
server | at Commander.BaseClass.<computed> [as publish] (/app/node_modules/@node-redis/client/dist/lib/commander.js:8:29)
server | at RedisAdapter.broadcast (/app/node_modules/@socket.io/redis-adapter/dist/index.js:406:28)
server | at BroadcastOperator.emit (/app/node_modules/socket.io/dist/broadcast-operator.js:109:22)
server | at AppGateway.handleMessage (/app/dist/app.gateway.js:21:30)
server | at /app/node_modules/@nestjs/websockets/context/ws-context-creator.js:43:33
server | at processTicksAndRejections (node:internal/process/task_queues:96:5)
server | at async AppGateway.<anonymous> (/app/node_modules/@nestjs/websockets/context/ws-proxy.js:11:32)
server | at async WebSocketsController.pickResult (/app/node_modules/@nestjs/websockets/web-sockets-controller.js:91:24)
server |
server | Node.js v17.4.0
server exited with code 1
我尝试降级到"redis": "^3.1.2"
和"socket.io-redis": "^6.0.0"
,并更新了相关代码(例如,使用RedisClient
而不是createClient
(,一切似乎都很好(我在使用三台服务器的k8s中尝试过,所有客户端都收到了消息(。
不过,我想使用最新的版本。考虑到RedisClient.connect
是一个异步函数,在这种情况下,连接Redis的正确方式是什么?createIOServer
也不是一个异步函数,所以我也不能在其中调用connect
。
仅供参考,这是我的main.ts
文件:
async function bootstrap() {
const app = await NestFactory.create<NestExpressApplication>(AppModule);
app.useWebSocketAdapter(new RedisIoAdapter(app));
await app.listen(3000);
}
bootstrap();
和app.gateway.ts
:
import { SubscribeMessage, WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
type Payload = {
name: String;
text: String;
};
@WebSocketGateway({
cors: {
origin: '*',
},
})
export class AppGateway {
@WebSocketServer() server: Server;
@SubscribeMessage('msgToServer')
handleMessage(client: Socket, payload: Payload) {
this.server.to('msgRoom').emit('msgToClient', payload);
}
handleConnection(client: Socket, ...args: any[]) {
client.join('msgRoom');
}
}
您应该使用redis3npm i --save redis@3
。只有微服务文档告诉我们这一点,但对整个Nest包都有效。
import { IoAdapter } from '@nestjs/platform-socket.io';
import { Server, ServerOptions } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import { INestApplication } from '@nestjs/common';
import { ConfigurationService } from 'src/configuration/configuration.service';
export class RedisIoAdapter extends IoAdapter {
protected redisAdapter;
constructor(app: INestApplication) {
super(app);
const configService = app.get(ConfigurationService);
const pubClient = createClient({
host: configService.get('REDIS_HOST'),
port: configService.get('REDIS_PORT'),
});
const subClient = pubClient.duplicate();
this.redisAdapter = createAdapter(pubClient, subClient);
}
createIOServer(port: number, options?: ServerOptions) {
const server = super.createIOServer(port, options) as Server;
server.adapter(this.redisAdapter);
return server;
}
}
它与redis@4(https://socket.io/docs/v4/redis-adapter/#usage)
import { IoAdapter } from '@nestjs/platform-socket.io';
import { Server, ServerOptions } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import { INestApplication } from '@nestjs/common';
import { ConfigurationService } from 'src/configuration/configuration.service';
export class RedisIoAdapter extends IoAdapter {
protected redisAdapter;
constructor(app: INestApplication) {
super(app);
const configService = app.get(ConfigurationService);
const pubClient = createClient({
socket: {
host: configService.get('REDIS_HOST'),
port: configService.get('REDIS_PORT'),
},
});
const subClient = pubClient.duplicate();
pubClient.connect(); // <------
subClient.connect(); // <------
this.redisAdapter = createAdapter(pubClient, subClient);
}
createIOServer(port: number, options?: ServerOptions) {
const server = super.createIOServer(port, options) as Server;
server.adapter(this.redisAdapter);
return server;
}
}
我在redis-io.adapter.ts
中添加了一个连接到客户端的函数:
export async function connectRedis() {
if (pubClient.isOpen && subClient.isOpen) return;
await Promise.all([pubClient.connect(), subClient.connect()]);
}
我在main.ts
叫它,就在useWebSocketAdapter
之前
async function bootstrap() {
const app = await NestFactory.create<NestExpressApplication>(AppModule);
await connectRedis();
app.useWebSocketAdapter(new RedisIoAdapter(app));
await app.listen(3000);
}
bootstrap();
我还将createAdapter
调用移到了createIOServer
中,这样,无论何时创建适配器,Redis客户端都将始终处于连接状态。现在似乎工作得很好,就像旧版本一样。