我得到了一个spring websocket服务器(基于Jetty和spring版本4.3. release)和客户端的负载测试设置,它生成了许多连接(基于spring的示例java websocket客户端)。下面的代码将数据发送到给定的websocket会话:代码片段利用了可以使用sessionId而不是用户ID的情况(Spring websocket @SendToSession:向特定会话发送消息)。我可能会经常执行这段代码,每2-3毫秒一次。我使用SimpleMessageBroker
public void publishToSessionUsingTopic(String sessionId, String subscriptionTopic, Map<String, CacheRowModel> payload) {
String subscriptionTopicWithoutUser = subscriptionTopic.replace(USER_ENDPOINT, "");
// necessary message headers for per-session send
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headerAccessor.setSessionId(sessionId);
headerAccessor.setLeaveMutable(true);
simpMessagingTemplate.convertAndSendToUser(sessionId, subscriptionTopicWithoutUser, Collections.singletonList(payload), headerAccessor.getMessageHeaders());
}
当这段代码非常频繁地执行(每2-3毫秒一次)约100个会话时,虽然我在日志中看到它被运行并称为convertAndSendToUser,但有些会话不会接收到消息。我很感谢任何关于如何解决这个问题的建议。
嗯,我想你的问题出在:
@Bean
public ThreadPoolTaskExecutor clientOutboundChannelExecutor() {
TaskExecutorRegistration reg = getClientOutboundChannelRegistration().getOrCreateTaskExecRegistration();
ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
executor.setThreadNamePrefix("clientOutboundChannel-");
return executor;
}
使用Executor
的配置:
protected ThreadPoolTaskExecutor getTaskExecutor() {
ThreadPoolTaskExecutor executor = (this.taskExecutor != null ? this.taskExecutor : new ThreadPoolTaskExecutor());
executor.setCorePoolSize(this.corePoolSize);
executor.setMaxPoolSize(this.maxPoolSize);
executor.setKeepAliveSeconds(this.keepAliveSeconds);
executor.setQueueCapacity(this.queueCapacity);
executor.setAllowCoreThreadTimeOut(true);
return executor;
}
看,没有配置RejectedExecutionHandler
。默认情况下是:
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
所以,当你有足够多的消息和任务超过ThreadPool
时,任何额外的都将被中止。
要解决这个问题,您应该实现WebSocketMessageBrokerConfigurer
并覆盖其configureClientOutboundChannel()
以提供一些自定义taskExecutor(ThreadPoolTaskExecutor taskExecutor)
,例如使用new ThreadPoolExecutor.CallerRunsPolicy()
。