我正在为我的一个基于弹簧的项目使用 Spring WebSocket 服务器实现。我遇到了一个错误说The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is invalid state
.我发现问题是同时从不同的线程写入 websocket。
我如何暂时修复它:考虑我已经实现了以下方法
void sendMessageToSession(WebsocketSession session,String message);
将文本消息发送到 websocket 会话。我无法使整个方法同步,因为多个线程可以为不同的 websocketSessions 和消息调用它。我也无法将会话放在同步块中(尝试过但不起作用)
虽然,我像这样解决了我的问题
synchronized(session.getId()){
//sending message;
}
我不再面临这个问题。但是在同步块中使用字符串似乎不是一个好的做法。 那么我还有什么其他解决方案呢?发送异步消息的最佳方式是什么?
PS:建立连接后我已经使用了ConcurrentWebSocketSessionDecorator
,并且我正在使用更新的websocket。 没有帮助。
session = new ConcurrentWebSocketSessionDecorator(session, (int) StaticConfig.MAXIMUM_WS_ASYNC_SEND_TIMEOUT, StaticConfig.MAXIMUM_WS_BINARY_BUFFER_SIZE * 2);
注意,我将我的websocet会话保存在映射中,其中键是session.getId,值是会话本身。
与其他一些 websocket 实现不同,Spring websocket 引用似乎在每个消息上都不相等。我通过会话的ID将会话保存在地图中,并且在每条消息上,我检查传递的websocket与我已经放在地图上的websocket的相等性,它是假的。
通过在我坚持会话的WebsocketSession
后面添加volatile
关键字,我解决了这个问题。我很高兴知道这是否也是一种不好的做法。但我的想法是,当从多个线程写入 websocket 会话时,这些线程会丢失 websocket 的状态,因为它尚未更新,这就是抛出此异常的原因。
通过添加易失性,我们确保在另一个线程使用它之前更新了 websocket 状态,以便写入 websocket 按预期同步工作。
我创建了一个名为SessionData
的类,其中包含 websocketSession 和我需要的所有其他有关会话的数据。
public class SessionData {
private volatile WebSocketSession websocketSession;
//...other
// getters and setters ...
}
我使用会话数据作为映射的值,其中会话 ID 是键
然后,当从SessionData获取websocketSession并从不同的线程写入它时,volatile 帮助我获得了更新的websocketSession。
<小时 />更新 (2020)
这里的一个关键说明是,每次要向会话发送消息时都应使用sessionData.getWebsocketSession.sendMessage(...)
。切勿直接使用会话,这意味着这样的代码是一种不好的做法:
WebSocketSession websocketSession = sessionData.getWebSocketSession();
websocketSession.sendMessage(...);
您永远不会知道这两行代码之间的 websocket 会话应用了哪些更改(在您的情况下可能超过 2 行)。
像这样的代码更好:
sessionData.getWebSocketSession().sendMessage(...);
也不要直接发布到在 Spring websocketMessageHandler
s 中传递给您的会话中,否则您可能会再次收到该错误。
这就是为什么在连接打开时将WebSocketSession
sessionId
映射到SessionData
的良好做法。您可以使用此存储库通过会话 ID 获取volatile session
,而不是直接使用会话。
ConcurrentWebSocketSessionDecorator就像多线程中的魅力,它是为它设计的。 您的地图实现可能有问题。
示例代码:
private final Map<String, SessionData> sessions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception
{
// Use the following will crash :
//sessions.put(session.getId(), new SessionData(session));
// Use ConcurrentWebSocketSessionDecorator is safe :
sessions.put(session.getId(), new SessionData(new ConcurrentWebSocketSessionDecorator (session, 2000, 4096)));
super.afterConnectionEstablished(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception
{
sessions.remove(session.getId());
super.afterConnectionClosed(session, status);
}
public void send(WebSocketSession session, String msg) throws MessagingException {
try {
session.sendMessage (new TextMessage(msg));
} catch (IOException ex) {
throw new MessagingException(ex.getMessage());
}
}
为了轻松测试多线程中的行为:
public void sendMT(WebSocketSession session, String msg) throws MessagingException{
for (int i=0; i<3; i++){
new Thread(){
@Override
public void run(){
send (session, msg);
}.start();
}
}
请参阅 SubProtocolWebSocketHandler 作为ConcurrentWebSocketSessionDecorator
的经过验证的用法。