JMS 出站网关请求目标 - 成功后处理



我正在使用JMS Outbound Gateway将消息发送到请求队列并从单独的响应队列接收消息。我想添加功能,以便在消息成功发送到请求队列后调用特定 Bean 的方法。

我为此使用了spring-integration 4.0.4 API 和 spring-integration-java-dsl 1.0.0 API,到目前为止,我已经能够实现上述功能,如下所示:

@Configuration
@EnableIntegration
public class IntegrationConfig {
    ...
    @Bean
    public IntegrationFlow requestFlow() {
        return IntegrationFlows
            .from("request.ch")
            .routeToRecipients(r ->
                r.ignoreSendFailures(false)
                 .recipient("request.ch.1", "true")
                 .recipient("request.ch.2", "true"))
            .get();
    }
    @Bean
    public IntegrationFlow sendReceiveFlow() {
        return IntegrationFlows
            .from("request.ch.1")
            .handle(Jms.outboundGateway(cachingConnectionFactory)
                    .receiveTimeout(45000)
                    .requestDestination("REQUEST_QUEUE")
                    .replyDestination("RESPONSE_QUEUE")
                    .correlationKey("JMSCorrelationID"), e -> e.requiresReply(true))
                    .channel("response.ch").get();
    }
    @Bean
    public IntegrationFlow postSendFlow() {
        return IntegrationFlows
            .from("request.ch.2")
            .handle("requestSentService", "fireRequestSuccessfullySentEvent")
            .get();
    }
    ...
}

现在,尽管上述配置有效,但我注意到在request.ch.2之前调用request.ch.1的唯一明显原因似乎是因为频道名称的字母顺序,而不是因为它们添加到RecipientListRouter本身的顺序。这是对的吗?还是我在这里错过了什么?

*

下面的编辑显示了在 JMS 出站/入站适配器方法(无消息传递网关)之间使用聚合器的解决方案 *

集成配置:

@Configuration
@EnableIntegration
public class IntegrationConfig { 
    ...
    @Bean
    public IntegrationFlow reqFlow() {
        return IntegrationFlows
            .from("request.ch")
            .enrichHeaders(e -> e.headerChannelsToString())
            .enrichHeaders(e -> e.headerExpression(IntegrationMessageHeaderAccessor.CORRELATION_ID, "headers['" + MessageHeaders.REPLY_CHANNEL + "']"))             
            .routeToRecipients(r -> {
                r.ignoreSendFailures(false);
                r.recipient("jms.req.ch", "true");
                r.recipient("jms.agg.ch", "true");
            })
            .get();
    }
    @Bean
    public IntegrationFlow jmsReqFlow() {
        return IntegrationFlows
            .from("jms.req.ch")
            .handle(Jms.outboundAdapter(cachingConnectionFactory)
                    .destination("TEST_REQUEST_CH")).get();
    }
    @Bean
    public IntegrationFlow jmsPostReqFlow() {
        return IntegrationFlows
            .from("jms.req.ch")
            .handle("postSendService", "postSendProcess")
            .get();
    }
    @Bean
    public IntegrationFlow jmsResFlow() {
        return IntegrationFlows
            .from(Jms.inboundAdapter(cachingConnectionFactory).destination(
                    "TEST_RESPONSE_CH"),
                    c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(10)))
            .channel("jms.agg.ch").get();
    }
    @Bean
    public IntegrationFlow jmsAggFlow() {
        return IntegrationFlows
            .from("jms.agg.ch")
            .aggregate(a -> { 
                a.outputProcessor(g -> {
                    List<Message<?>> l = new ArrayList<Message<?>>(g.getMessages());
                    Message<?> firstMessage = l.get(0);
                    Message<?> lastMessage = (l.size() > 1) ? l.get(l.size() - 1) : firstMessage;
                    Message<?> messageOut = MessageBuilder.fromMessage(lastMessage)
                            .setHeader(MessageHeaders.REPLY_CHANNEL, (String) firstMessage.getHeaders().getReplyChannel())
                            .build();
                     return messageOut;
                }); 
                a.releaseStrategy(g -> g.size() == 2);
                a.groupTimeout(45000);
                a.sendPartialResultOnExpiry(false);
                a.discardChannel("jms.agg.timeout.ch");
            }, null)
            .channel("response.ch")
            .get();
        }
    }
    @Bean
    public IntegrationFlow jmsAggTimeoutFlow() {
        return IntegrationFlows
            .from("jms.agg.timeout.ch")
            .handle(Message.class, (m, h) -> new ErrorMessage(new MessageTimeoutException(m), h))
            .channel("error.ch")
            .get();
    }
}

干杯下午

H-m...看来。这确实是DslRecipientListRouter逻辑中的一个错误:https://github.com/spring-projects/spring-integration-java-dsl/issues/9将很快修复并在几天内发布。

感谢您指出这一点!

顺便说一句,你的逻辑有点不正确:即使我们修复了该RecipientListRouter,第二个 recipinet 也只会在JmsOutboundGateway收到reply之后才收到相同的请求消息,而不仅仅是在请求被发送到请求队列之后。它是阻止的请求-答复进程。并且在JmsOutboundGateway中没有钩子可以在请求和回复之间获得分数.

这对你来说可以吗?