弹簧集成DSL分散收集流块



我有一个集成流,它执行分散收集操作,该操作命中多个返回 JSON 的 HTTP 端点。然后将结果聚合到单个 JSON 对象中。流程是这样的

@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
.handle(HeaderPrinter::headerPrinter)
.enrichHeaders(httpRequestHeaderEnricher())
.scatterGather(
scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.applySequence(true),
gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
)
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
.channel("myFlow.output");
}

我使用声明如下的网关启动流程

@MessagingGateway
public interface IMyGateway {
@Gateway(requestChannel = "myFlow.input", replyChannel = "myFlow.output")
MyResult startFlow(@Payload String payload, @Header("header1") String header1, @Header("header2") String header2);
}

我遇到的问题是整个流阻塞和网关超时。我在两个服务调用IMyService::handleAggregatedJsonIMyOutherService::handleMyServiceResult中放置了断点,它们都在运行,但输出永远不会到达网关的回复通道。如果我删除最后两个句柄操作,则流通常会通过网关返回结果。

我已经在流被阻塞时查看了堆栈跟踪,我可以看到运行流的线程正在等待锁定

java.lang.Thread.State: WAIT at sun.misc.Unsafe.park(Unsafe.java:-1) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSyncr.java:836) 在 java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSyncr.java:997) 在 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSyncr.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 在 org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:308) 在 org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:300) 在 org.springframework.messaging.core.GenericMessagingTemplate.doReceive(GenericMessagingTemplate.java:201) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:234) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47) 在 org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:45) 在 org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) 在 org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38) 在 org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:95) 在 org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:85) 在 org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:487) 在 org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:461) 在 org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:520) 在 org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:469) 在 org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:460) 在 org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) 在 org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) at com.sun.proxy.$Proxy 116.startFlow(未知来源:-1)

据我怀疑,如果流量花费的时间超过 X 次,那么它会阻塞。我尝试在流和网关之间放置一个会合通道,但它似乎不起作用。

关于导致超时问题的原因的任何想法?

附录:我一直在摆弄代码并删除网关上的返回类型,流上的最后一个 .channel 调用似乎确实停止阻止它。

以下工作正常

@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
.handle(HeaderPrinter::headerPrinter)
.enrichHeaders(httpRequestHeaderEnricher())
.scatterGather(
scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.applySequence(true),
gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
)
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
.handle(m -> {
log.info("Flow completed successfully, payload as expected:" + payload);
});
}

我想知道你是否所有

.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")

收集后返回一些值。请求-答复的典型错误,即流中的某个步骤停止以某个合理的值进行回复。

更新

应考虑从@Gateway定义中删除显式replyChannel声明,并从流末尾删除.channel("myFlow.output")。 这样,您应该得到对replyChannel标头的回复。当您配置显式replyChannel时,不能保证您不会有其他订阅者访问此频道,这将"窃取"您的回复消息。

有关详细信息,请参阅参考手册。

相关内容

  • 没有找到相关文章

最新更新