如何在Spring Integration Flow中正确实现延迟



我正在尝试在Spring集成流中实现延迟
我有一个流正在另一台服务器上启动一个进程,然后在延迟后检查该进程是否完成
流程完成后应进入下一阶段
这似乎起到了作用,它还显示在日志中(很明显,在流本身中(,runexampleScriptWaiting通道中的一长串重复。

我试着消除通道的变化,但后来流量永远停留在那个阶段,永远无法完成。

我如何实现这一点,以便显示/执行单个runexampleScriptWaiting(我想类似于非阻塞while循环(?

我考虑保持原样,只更新我的监控应用程序(一个非常小的前端,显示有效载荷历史中的通道(,以消除重复的通道线,但我也想知道是否有更好/更稳健的方法来做到这一点。

这里有一个简化的例子:

@Bean
public IntegrationFlow exampleIntegrationFlow() {
return IntegrationFlows
.from(exampleConfig.runexampleScript.get())
.<ExamplePayload>handle((payload, messageHeaders) -> examplePayloadService
.changeExampleServiceRequestStatus(payload, ExampleServiceStatus.STARTED))
.<ExamplePayload>handle(
(payload, messageHeaders) -> exampleScriptService.runexample(payload))
.channel(exampleConfig.runexampleScriptWaiting.get())
.<ExamplePayload, Boolean>route(jobStatusService::areJobsFinished,
router -> router
.subFlowMapping(true, exampleSuccessSubflow())
.subFlowMapping(false, exampleWaitSubflow())
.defaultOutputToParentFlow()
)
.get();
}
@Bean
public IntegrationFlow exampleWaitSubflow() {
return IntegrationFlows
.from(exampleConfig.runexampleScriptWaiting.get())
.<ExamplePayload>handle(
(payload, messageHeaders) -> {
interruptIgnoringSleep(1000);
return payload;
})
.channel(exampleConfig.runexampleScriptWaiting.get()) // Commenting this gets the process stuck
.get();
}

目前还不清楚您的exampleConfig.runexampleScriptWaiting.get()是什么,但到目前为止,您在配置中拥有的内容还不确定。您有两个订户访问同一频道:

  1. .channel(exampleConfig.runexampleScriptWaiting.get())和下一个route()

  2. .from(exampleConfig.runexampleScriptWaiting.get())和下一个handle()

这可能会导致意外行为,例如循环消息分发。

除了ExecutorChannel之外,我还会做filter()delay(),因为你问的是非阻塞重试:

.channel(exampleConfig.runexampleScriptWaiting.get())
.filter(jobStatusService::areJobsFinished, 
filter -> filter.discardFlow(
discardFlow -> discardFlow
.delay(1000)
.channel(exampleConfig.runexampleScriptWaiting.get())))

作为该流的一部分,exampleSuccessSubflow可以紧接在该filter()之后或者经由to(exampleSuccessSubflow())

注意discardFlow:我们将未完成的消息延迟一点,并将其生成回runexampleScriptWaiting通道,以便再次调用此筛选器。如果您将此通道设置为ExecutorChannel(或QueueChannel(,则您的等待功能将是非阻塞的。但与此同时,由于您仍在等待答复,您的主流仍将被阻止。因此,将这个过滤逻辑设为非阻塞可能没有太大意义,并且您仍然可以使用Thread.sleep()而不是delay()

路由器解决方案也可以工作,但不能将runexampleScriptWaiting通道用作该子流的输入。也许这就是你的问题背后的原因;过程卡住";。

最新更新