将多个RabbitMQ交换绑定到单个队列会在Spring Cloud Stream中引发错误



我在Spring Cloud Stream应用程序中使用RabbitMQ作为输入。我试图将多个交换绑定到一个队列输出,并通过逗号分隔列出交换来实现这一点(如文档中所述(。这是我的application-orders.yml:

spring:
main:
# added this to avoid 
# org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'my.queue.orders.errors.recoverer' defined in null: Cannot register bean definition for bean 'my.queue.orders.errors.recoverer': There is already  bound.
allow-bean-definition-overriding: true
rabbitmq:
# username, password, address
cloud:
stream:
bindings:
input:
binder: rabbitmq
destination: mapper1.fanout.replication,mapper2.fanout.replication
group: my.queue.orders

这是在基础application.yml:中

(spring.cloud.)stream:
default:
content-type: application/json
binders:
rabbit:
type: rabbit
bindings:
input:
binder: ${application.source.binder}
# e.g. rabbit exchange name
destination: ${application.source.destination}
# e.g. rabbit queue name
group: ${application.source.group}
consumer:
max-attempts: 1
# number of threads consuming messages
# may be critical because of ordering of messages
concurrency: 1
...
rabbit:
binder:
nodes: ${application.source.nodes}
bindings:
input:
consumer:
exchangeType: fanout
queueNameGroupOnly: true
exchangeAutoDelete: false
# the following settings will not ACK the message on failure during send to EventHubs
republishToDlq: false
requeueRejected: true

当我从交换机mapper1.fanout.replication发送消息时,它按预期工作。但如果我从第二个交换机mapper2.fanout.replication发送消息,我会得到以下信息:

2020-11-06 14:32:06.480  INFO 67016 --- [ask-scheduler-1] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: my.queue.orders, bound to: mappper2.fanout.replication
2020-11-06 14:32:06.481 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$1049/0x0000000800868c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@4a5f9770 Shared Rabbit Connection: SimpleConnection@5ec708d9 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62850]
2020-11-06 14:32:06.481 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'mapper2.fanout.replication'
2020-11-06 14:32:06.488 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$1059/0x000000080086c440 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@4a5f9770 Shared Rabbit Connection: SimpleConnection@5ec708d9 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62850]
2020-11-06 14:32:06.488 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'my.queue.orders'
2020-11-06 14:32:06.493 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$1060/0x000000080086b840 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@4a5f9770 Shared Rabbit Connection: SimpleConnection@5ec708d9 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62850]
2020-11-06 14:32:06.493 DEBUG 67016 --- [ask-scheduler-1] o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [my.queue.orders (QUEUE)] to exchange [mapper2.fanout.replication] with routing key []
2020-11-06 14:32:06.497 DEBUG 67016 --- [ask-scheduler-1] c.s.b.r.p.RabbitExchangeQueueProvisioner : autoBindDLQ=false for: my.queue.orders
2020-11-06 14:32:06.497 DEBUG 67016 --- [ask-scheduler-1] o.s.a.r.l.SimpleMessageListenerContainer : Changing consumers from 1 to 1
2020-11-06 14:32:06.499 ERROR 67016 --- [ask-scheduler-1] o.s.cloud.stream.binding.BindingService  : Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:461)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:90)
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143)
at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:201)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Only one LastSubscriberMessageHandler is allowed
at org.springframework.cloud.stream.binder.BinderErrorChannel.subscribe(BinderErrorChannel.java:44)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:712)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:633)
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:511)
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:129)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:407)
... 10 common frames omitted

我有一个自定义处理器如下:

interface Processor {
String INPUT = "input";
@Input(Processor.INPUT)
SubscribableChannel input();
//outputs...
} 

和一个MessageHandler类:

@Profile({ "orders" })
@EnableBinding(Processor.class)
@RequiredArgsConstructor
@Slf4j
public class MessageHandler {
@Autowired
private Processor pipe;
@StreamListener(Processor.INPUT)
public void handleInputMessage(final Message<String> message) {
//do stuff...
}
...
}

我正在使用Spring Cloud Hoxton。SR8和spring-cloud-stream-binder-rabbit-3.0.8

有什么想法吗?

尝试将multiplex使用者属性设置为true;然后,您将从两个队列中获得一个消耗的绑定。

https://docs.spring.io/spring-cloud-stream/docs/3.0.8.RELEASE/reference/html/spring-cloud-stream.html#_consumer_properties

最新更新