我刚刚盯着学习弹簧整合 我想在队列中接收消息并并行执行 2 个步骤: 第 1 步 -> 使用 bean 处理它 步骤 2 -> 转换并将其发送到另一个队列。 像这样:
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queue1")
.configureContainer(simpleMessageListenerContainerSpec -> {
simpleMessageListenerContainerSpec.concurrentConsumers(3);
}))
.log(message -> "SERVICE EVENT QUEUE : Received Message : " + message.getPayload())
.handle(serviceBean, "process")
.<String,String>transform(String::toLowerCase)
.log(message -> "SERVICE EVENT QUEUE : Transformed Message : " + message.getPayload())
.handle(
Amqp.outboundAdapter(rabbitTemplate)
.exchangeName("exchange")
.routingKey("queue2.routing"))
.get();
我错过了什么?第一个句柄之后的操作未执行。我想我没有正确理解这部分。 另外,我如何并行执行这两个步骤?
您应该从理论开始,了解Spring Integration中的许多概念和组件。
"并行 2 步" - 完全是一种发布-订阅模式:https://www.enterpriseintegrationpatterns.com/patterns/messaging/PublishSubscribeChannel.html 和 Spring 集成为它提供了一个实现:https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations-publishsubscribechannel。正如您所看到的,根据文档要实现并行性,您需要使用TaskExecutor
配置这样的通道。
通过Java DSL,我们为发布-订阅配置提供了一个高级API:
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-subflows
因此,要使您的.handle(serviceBean, "process")
和.<String,String>transform(String::toLowerCase)
并行,您需要具有如下所示的内容:
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queue1")
.configureContainer(simpleMessageListenerContainerSpec -> {
simpleMessageListenerContainerSpec.concurrentConsumers(3);
}))
.log(message -> "SERVICE EVENT QUEUE : Received Message : " + message.getPayload())
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.handle(serviceBean, "process")))
.<String,String>transform(String::toLowerCase)
.log(message -> "SERVICE EVENT QUEUE : Transformed Message : " + message.getPayload())
.handle(
Amqp.outboundAdapter(rabbitTemplate)
.exchangeName("exchange")
.routingKey("queue2.routing"))
.get();