Spring websocket 从多个线程发送消息



我正在为我的一个基于弹簧的项目使用 Spring WebSocket 服务器实现。我遇到了一个错误说The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is invalid state.我发现问题是同时从不同的线程写入 websocket。

我如何暂时修复它:考虑我已经实现了以下方法

void sendMessageToSession(WebsocketSession session,String message);

将文本消息发送到 websocket 会话。我无法使整个方法同步,因为多个线程可以为不同的 websocketSessions 和消息调用它。我也无法将会话放在同步块中(尝试过但不起作用)

虽然,我像这样解决了我的问题

synchronized(session.getId()){ 
//sending message;
}

我不再面临这个问题。但是在同步块中使用字符串似乎不是一个好的做法。 那么我还有什么其他解决方案呢?发送异步消息的最佳方式是什么?

PS:建立连接后我已经使用了ConcurrentWebSocketSessionDecorator,并且我正在使用更新的websocket。 没有帮助。

session = new ConcurrentWebSocketSessionDecorator(session, (int) StaticConfig.MAXIMUM_WS_ASYNC_SEND_TIMEOUT, StaticConfig.MAXIMUM_WS_BINARY_BUFFER_SIZE * 2);

注意,我将我的websocet会话保存在映射中,其中键是session.getId,值是会话本身。

与其他一些 websocket 实现不同,Spring websocket 引用似乎在每个消息上都不相等。我通过会话的ID将会话保存在地图中,并且在每条消息上,我检查传递的websocket与我已经放在地图上的websocket的相等性,它是假的。

通过在我坚持会话的WebsocketSession后面添加volatile关键字,我解决了这个问题。我很高兴知道这是否也是一种不好的做法。但我的想法是,当从多个线程写入 websocket 会话时,这些线程会丢失 websocket 的状态,因为它尚未更新,这就是抛出此异常的原因。

通过添加易失性,我们确保在另一个线程使用它之前更新了 websocket 状态,以便写入 websocket 按预期同步工作。

我创建了一个名为SessionData的类,其中包含 websocketSession 和我需要的所有其他有关会话的数据。

public class SessionData {
private volatile WebSocketSession websocketSession;
//...other 
// getters and setters ...
}

我使用会话数据作为映射的值,其中会话 ID 是键

然后,当从SessionData获取websocketSession并从不同的线程写入它时,volatile 帮助我获得了更新的websocketSession。

<小时 />

更新 (2020)

这里的一个关键说明是,每次要向会话发送消息时都应使用sessionData.getWebsocketSession.sendMessage(...)。切勿直接使用会话,这意味着这样的代码是一种不好的做法

WebSocketSession websocketSession = sessionData.getWebSocketSession();
websocketSession.sendMessage(...);

您永远不会知道这两行代码之间的 websocket 会话应用了哪些更改(在您的情况下可能超过 2 行)。

像这样的代码更好:

sessionData.getWebSocketSession().sendMessage(...);

也不要直接发布到在 Spring websocketMessageHandlers 中传递给您的会话中,否则您可能会再次收到该错误。

这就是为什么在连接打开时将WebSocketSessionsessionId映射到SessionData的良好做法。您可以使用此存储库通过会话 ID 获取volatile session,而不是直接使用会话。

ConcurrentWebSocketSessionDecorator就像多线程中的魅力,它是为它设计的。 您的地图实现可能有问题。

示例代码:

private final Map<String, SessionData> sessions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception 
{
// Use the following will crash :
//sessions.put(session.getId(), new SessionData(session));
// Use ConcurrentWebSocketSessionDecorator is safe :
sessions.put(session.getId(), new SessionData(new ConcurrentWebSocketSessionDecorator (session, 2000, 4096)));
super.afterConnectionEstablished(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception
{
sessions.remove(session.getId());
super.afterConnectionClosed(session, status); 
}
public void send(WebSocketSession session, String msg) throws MessagingException {
try {
session.sendMessage (new TextMessage(msg));
} catch (IOException ex) {
throw new MessagingException(ex.getMessage());
}
}

为了轻松测试多线程中的行为:

public void sendMT(WebSocketSession session, String msg) throws MessagingException{
for (int i=0; i<3; i++){
new Thread(){
@Override
public void run(){
send (session, msg);
}.start();  
}
}

请参阅 SubProtocolWebSocketHandler 作为ConcurrentWebSocketSessionDecorator的经过验证的用法。

最新更新