春云流 - 慰藉 PubSub+ - 消费者并发



我正在使用Spring Cloud Stream 3.0.6(Cloud:Hoxton.SR6,Boot 2.3.0.RELEASE(与Solace PubSub+结合使用。 我无法让并发使用者工作。无论我配置什么,总有一个线程依次执行每个传入的消息。

这是我的StreamListener代码:

@StreamListener(JobTriggerEventConsumerBinding.INPUT)
protected void onJobTriggerEvent(org.springframework.messaging.Message<JobExecutionTriggerEvent> message, 
JobExecutionTriggerEvent event, 
MessageHeaders headers) throws InterruptedException {

log.info("Processing on thread: " + Thread.currentThread().getId());

Thread.sleep(5000);

log.info("Received the event!");
log.info("-- Raw message:    {}", message);
log.info("-- Headers:        {}", headers);
log.info("-- Event:          {}", event);
log.info("-- Event Contents: {}", event.getMessage());
}

如果我向输入通道发送 3 条消息(使用我编写的生产者应用程序(,我会看到消息在同一线程(具有相同的 ID(上按顺序处理。我想实现的是消息由 3 个线程并发处理。

我的application.yml如下所示:

spring:
cloud:
stream:
default:
group: defaultConsumers
consumer:
concurrency: 3
bindings:
jobTriggers:
group: jobTriggerConsumers 
consumer:
concurrency: 3
max-attempts: 1
solace: 
bindings:
jobTriggers:
consumer:
requeue-rejected: true

我的pom.xml包含以下依赖项:

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<!-- Dependency to Solace PubSub+ Spring Cloud Stream integration (binder) -->
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>spring-cloud-starter-stream-solace</artifactId>
<version>2.0.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cloud-connectors</artifactId>
</exclusion>
</exclusions>
</dependency>

难道,这是Solace PubSub+活页夹的问题?我在这里读到,spring.cloud.stream.binders.<name>.consumer.concurrency的行为可能取决于粘合剂的实现。

这里可能有什么问题?

引用:

  • 慰藉酒吧+活页夹
  • 用于本地运行的 Solace PubSub+ 实例的 Docker 撰写文件:
# docker-compose -f PubSubStandard_singleNode.yml up
version: '3.3'
services:
primary:
container_name: pubSubStandardSingleNode
image: solace/solace-pubsub-standard:latest
shm_size: 1g
ulimits:
core: 1
nofile:
soft: 2448
hard: 38048
ports:
#Port Mappings:  Ports are mapped straight through from host to
#container.  This may result in port collisions on commonly used
#ports that will cause failure of the container to start.
#Web transport
- '80:80'
#Web transport over TLS
- '443:443'
#SEMP over TLS
- '943:943'
#MQTT Default VPN
#- '1883:1883'
#AMQP Default VPN over TLS
- '5671:5671'
#AMQP Default VPN
- '5672:5672'
#MQTT Default VPN over WebSockets
#- '8000:8000'
#MQTT Default VPN over WebSockets / TLS
#- '8443:8443'
#MQTT Default VPN over TLS
#- '8883:8883'
#SEMP / PubSub+ Manager
- '8080:8080'
#REST Default VPN
#- '9000:9000'
#REST Default VPN over TLS
#- '9443:9443'
#SMF
- '55555:55555'
#SMF Compressed
#- '55003:55003'
#SMF over TLS
- '55443:55443'
environment:
- username_admin_globalaccesslevel=admin
- username_admin_password=admin
- system_scaling_maxconnectioncount=100

好的,我会自己回答这个问题。

使用 RabbitMQ 绑定器和正在运行的 Rabbit 实例尝试上述配置,并发性工作得很好。所以我认为一定是Solace活页夹在制造问题。

经过一番谷歌搜索,我确实找到了确认:https://github.com/SolaceProducts/solace-spring-cloud/issues/7

显然,Solace PubSub+ binder目前不支持并发性,这是一个真正的无赖。至少看起来问题正在解决中。

这里还有一些社区讨论: https://solace.community/discussion/284/concurrency-property-with-solace-spring-cloud-stream-api#latest

更新

此问题似乎已在春云流 Solace 活页夹的第2.1.1版中得到修复。 即使用此依赖项

<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>spring-cloud-starter-stream-solace</artifactId>
<version>2.1.1</version
</dependency>

如果您使用的是 Spring Cloud Solace BOM,则至少需要转到版本1.1.1

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-bom</artifactId>
<version>1.1.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

这适用于 春云Hoxton.SR6.