我正在使用JMS Outbound Gateway
将消息发送到请求队列并从单独的响应队列接收消息。我想添加功能,以便在消息成功发送到请求队列后调用特定 Bean 的方法。
我为此使用了spring-integration 4.0.4
API 和 spring-integration-java-dsl 1.0.0
API,到目前为止,我已经能够实现上述功能,如下所示:
@Configuration
@EnableIntegration
public class IntegrationConfig {
...
@Bean
public IntegrationFlow requestFlow() {
return IntegrationFlows
.from("request.ch")
.routeToRecipients(r ->
r.ignoreSendFailures(false)
.recipient("request.ch.1", "true")
.recipient("request.ch.2", "true"))
.get();
}
@Bean
public IntegrationFlow sendReceiveFlow() {
return IntegrationFlows
.from("request.ch.1")
.handle(Jms.outboundGateway(cachingConnectionFactory)
.receiveTimeout(45000)
.requestDestination("REQUEST_QUEUE")
.replyDestination("RESPONSE_QUEUE")
.correlationKey("JMSCorrelationID"), e -> e.requiresReply(true))
.channel("response.ch").get();
}
@Bean
public IntegrationFlow postSendFlow() {
return IntegrationFlows
.from("request.ch.2")
.handle("requestSentService", "fireRequestSuccessfullySentEvent")
.get();
}
...
}
现在,尽管上述配置有效,但我注意到在request.ch.2
之前调用request.ch.1
的唯一明显原因似乎是因为频道名称的字母顺序,而不是因为它们添加到RecipientListRouter
本身的顺序。这是对的吗?还是我在这里错过了什么?
下面的编辑显示了在 JMS 出站/入站适配器方法(无消息传递网关)之间使用聚合器的解决方案 *
集成配置:
@Configuration
@EnableIntegration
public class IntegrationConfig {
...
@Bean
public IntegrationFlow reqFlow() {
return IntegrationFlows
.from("request.ch")
.enrichHeaders(e -> e.headerChannelsToString())
.enrichHeaders(e -> e.headerExpression(IntegrationMessageHeaderAccessor.CORRELATION_ID, "headers['" + MessageHeaders.REPLY_CHANNEL + "']"))
.routeToRecipients(r -> {
r.ignoreSendFailures(false);
r.recipient("jms.req.ch", "true");
r.recipient("jms.agg.ch", "true");
})
.get();
}
@Bean
public IntegrationFlow jmsReqFlow() {
return IntegrationFlows
.from("jms.req.ch")
.handle(Jms.outboundAdapter(cachingConnectionFactory)
.destination("TEST_REQUEST_CH")).get();
}
@Bean
public IntegrationFlow jmsPostReqFlow() {
return IntegrationFlows
.from("jms.req.ch")
.handle("postSendService", "postSendProcess")
.get();
}
@Bean
public IntegrationFlow jmsResFlow() {
return IntegrationFlows
.from(Jms.inboundAdapter(cachingConnectionFactory).destination(
"TEST_RESPONSE_CH"),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(10)))
.channel("jms.agg.ch").get();
}
@Bean
public IntegrationFlow jmsAggFlow() {
return IntegrationFlows
.from("jms.agg.ch")
.aggregate(a -> {
a.outputProcessor(g -> {
List<Message<?>> l = new ArrayList<Message<?>>(g.getMessages());
Message<?> firstMessage = l.get(0);
Message<?> lastMessage = (l.size() > 1) ? l.get(l.size() - 1) : firstMessage;
Message<?> messageOut = MessageBuilder.fromMessage(lastMessage)
.setHeader(MessageHeaders.REPLY_CHANNEL, (String) firstMessage.getHeaders().getReplyChannel())
.build();
return messageOut;
});
a.releaseStrategy(g -> g.size() == 2);
a.groupTimeout(45000);
a.sendPartialResultOnExpiry(false);
a.discardChannel("jms.agg.timeout.ch");
}, null)
.channel("response.ch")
.get();
}
}
@Bean
public IntegrationFlow jmsAggTimeoutFlow() {
return IntegrationFlows
.from("jms.agg.timeout.ch")
.handle(Message.class, (m, h) -> new ErrorMessage(new MessageTimeoutException(m), h))
.channel("error.ch")
.get();
}
}
干杯下午
H-m...看来。这确实是DslRecipientListRouter
逻辑中的一个错误:https://github.com/spring-projects/spring-integration-java-dsl/issues/9将很快修复并在几天内发布。
感谢您指出这一点!
顺便说一句,你的逻辑有点不正确:即使我们修复了该RecipientListRouter
,第二个 recipinet 也只会在JmsOutboundGateway
收到reply
之后才收到相同的请求消息,而不仅仅是在请求被发送到请求队列之后。它是阻止的请求-答复进程。并且在JmsOutboundGateway
中没有钩子可以在请求和回复之间获得分数.
这对你来说可以吗?