多服务器环境中的用户目标?(Spring WebSocket 和 RabbitMQ)



Spring WebSockets 的文档指出:

4.4.13. 用户目的地

应用程序可以发送针对特定用户的消息,Spring 的 STOMP 支持可以识别以"/user/"为前缀的目的地。例如,客户端可能订阅目标"/user/queue/position-updates"。此目标将由 UserDestinationMessageHandler 处理,并转换为用户会话独有的目标,例如"/queue/position-updates-user123"。这提供了订阅通用名称目的地的便利,同时确保与订阅同一目的地的其他用户不会发生冲突,以便每个用户都可以接收唯一的股票头寸更新。

这是否应该在以 RabbitMQ 作为代理的多服务器环境中工作?

据我所知,用户的队列名称是通过附加simpSessionId生成的。使用推荐的客户端库 tomp 时.js这会导致第一个用户获取队列名称"/queue/position-updates-user0",下一个用户获取"/queue/position-updates-user1",依此类推。 这反过来意味着连接到不同服务器的第一批用户将订阅相同的队列("/queue/position-updates-user0")。

我可以在文档中找到的唯一参考是:

在多应用程序服务器方案中,用户目标可能仍未解析,因为用户连接到不同的服务器。在这种情况下,您可以配置目标以广播未解析的消息,以便其他服务器有机会尝试。这可以通过 Java config 中 MessageBrokerRegistry 的 userDestinationBroadcast 属性和 XML 中消息代理元素的用户-目的地-广播属性来完成。

但这只能使从与建立 Web 套接字的服务器不同的服务器与用户进行通信成为可能。

我觉得我错过了什么?有没有办法将Spring配置为能够在多服务器环境中安全地使用MessagingTemplate.convertAndSendToUser(principal.getName(), destination, payload)

如果他们需要进行身份验证(我假设他们的凭据存储在数据库中),您可以随时使用他们的数据库唯一用户 ID 进行订阅。

我所做的是当用户登录时,他们会自动订阅两个主题,一个是系统范围广播account|system主题,另一个是特定广播account|<userId>主题。

您可以尝试类似notification|<userid>的东西,让每个人都订阅,然后向该主题发送消息,他们将收到它。

由于用户 ID 对于每个用户都是唯一的,因此只要每个环境都访问相同的数据库信息,在群集环境中就不会出现问题。

这是我的发送方法:

public static boolean send(Object msg, String topic) {
try {
String destination = topic;
String payload = toJson(msg); //jsonfiy the message 
Message<byte[]> message = MessageBuilder.withPayload(payload.getBytes("UTF-8")).build();
template.send(destination, message);
return true;
} catch (Exception ex) {
logger.error(CommService.class.getName(), ex);
return false;
}
}

我的目的地是预先格式化的,所以如果我想向用户发送一条 id 为一个的消息,目的地看起来像/topic/account|1.

我创建了一个乒乓球控制器,为连接的用户测试 websockets,以查看他们的环境是否允许 websockets。我不知道这是否会对您有所帮助,但这在我的集群环境中确实有效。

/**
* Play ping pong between the client and server to see if web sockets work
* @param input the ping pong input
* @return the return data to check for connectivity
* @throws Exception exception
*/
@MessageMapping("/ping")
@SendToUser(value="/queue/pong", broadcast=false) // send only to the session that sent the request
public PingPong ping(PingPong input) throws Exception {
int receivedBytes = input.getData().length;
int pullBytes = input.getPull();
PingPong response = input;
if (pullBytes == 0) {
response.setData(new byte[0]);
} else if (pullBytes != receivedBytes)  {
// create random byte array
byte[] data =  randomService.nextBytes(pullBytes);
response.setData(data);
}
return response;
}

最新更新