Spring Cloud Stream with RabbitMQ Streams



我正在尝试使用;新的";RabbitMQ的Streams插件与我的春季云流项目使用";函数编程模型";。

我已经设置了我的应用程序

spring:
rabbitmq:
listener:
type: stream
stream:
host: ${RABBIT_HOST:localhost}
port: ${RABBIT_PORT:5672}
username: guest
password: guest
name: demo
cloud:
function:
definition: testConsumer
stream:
rabbit:
bindings:
testConsumer-in-0:
consumer:
containerType: stream
bindings:
testConsumer-in-0:
group: demo
destination: test
testProducer-out-0:
destination: test

我有一个使用StreamBridge的@PostConstruct方法,如下所示:

streamBridge.send("testProducer-out-0", "testing..");

我的测试消费者是这样的:

@Bean
public Consumer<Flux<String>> testConsumer() {
return flux -> flux.doOnEach(LOGGER::info);
}

但当我启动我的应用程序时,我会得到一个异常:

Caused by: com.rabbitmq.stream.StreamException: Could not get response in 10000 ms

在我的RabbitMQ容器的日志中,我得到了以下错误:

2022-09-14 13:30:53.485574+00:00 [error] <0.32309.0> {bad_header,<<0,0,1,0,0,17,0,1>>}

如果我将spring.cloud.stream.rabbit.bindings.testConsumer-in.0.consumer.contangerType设置为direct,一切都很好。

有人知道为什么吗?

看起来您正试图通过AMQP端口而不是流端口进行连接。

流端口为5552。

您是否映射流端口并启用插件?https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-first-application/

最新更新