频繁发送到spring-websocket会话:在传输中丢失



我得到了一个spring websocket服务器(基于Jetty和spring版本4.3. release)和客户端的负载测试设置,它生成了许多连接(基于spring的示例java websocket客户端)。下面的代码将数据发送到给定的websocket会话:代码片段利用了可以使用sessionId而不是用户ID的情况(Spring websocket @SendToSession:向特定会话发送消息)。我可能会经常执行这段代码,每2-3毫秒一次。我使用SimpleMessageBroker

 public void publishToSessionUsingTopic(String sessionId, String subscriptionTopic, Map<String, CacheRowModel> payload) {
        String subscriptionTopicWithoutUser = subscriptionTopic.replace(USER_ENDPOINT, "");
        // necessary message headers for per-session send
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
        headerAccessor.setSessionId(sessionId);
        headerAccessor.setLeaveMutable(true);          
        simpMessagingTemplate.convertAndSendToUser(sessionId, subscriptionTopicWithoutUser, Collections.singletonList(payload), headerAccessor.getMessageHeaders());
}

当这段代码非常频繁地执行(每2-3毫秒一次)约100个会话时,虽然我在日志中看到它被运行并称为convertAndSendToUser,但有些会话不会接收到消息。我很感谢任何关于如何解决这个问题的建议。

嗯,我想你的问题出在:

@Bean
public ThreadPoolTaskExecutor clientOutboundChannelExecutor() {
    TaskExecutorRegistration reg = getClientOutboundChannelRegistration().getOrCreateTaskExecRegistration();
    ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
    executor.setThreadNamePrefix("clientOutboundChannel-");
    return executor;
}

使用Executor的配置:

protected ThreadPoolTaskExecutor getTaskExecutor() {
    ThreadPoolTaskExecutor executor = (this.taskExecutor != null ? this.taskExecutor : new ThreadPoolTaskExecutor());
    executor.setCorePoolSize(this.corePoolSize);
    executor.setMaxPoolSize(this.maxPoolSize);
    executor.setKeepAliveSeconds(this.keepAliveSeconds);
    executor.setQueueCapacity(this.queueCapacity);
    executor.setAllowCoreThreadTimeOut(true);
    return executor;
}

看,没有配置RejectedExecutionHandler。默认情况下是:

private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();

所以,当你有足够多的消息和任务超过ThreadPool时,任何额外的都将被中止。

要解决这个问题,您应该实现WebSocketMessageBrokerConfigurer并覆盖其configureClientOutboundChannel()以提供一些自定义taskExecutor(ThreadPoolTaskExecutor taskExecutor),例如使用new ThreadPoolExecutor.CallerRunsPolicy()

相关内容

  • 没有找到相关文章

最新更新