实现回调机制/通知的策略,当所有异步弹簧集成流/线程执行完成时



我有 Spring 集成流,每天触发一次,它从数据库中提取所有各方并将每一方发送到执行器通道。 下一个流将为每一方拉取数据,然后通过发送到不同的执行器通道并行处理它们。 我面临的挑战是我如何知道整个过程何时结束。关于如何解决这个问题的任何想法. 这是我的执行程序通道和集成流的伪代码。

@Bean
public IntegrationFlow fileListener() {
return IntegrationFlows.from(Files.inboundAdapter(new 
File("pathtofile"))).channel("mychannel").get();
}
@Bean
public IntegrationFlow flowOne() throws ParserConfigurationException {
return IntegrationFlows.from("mychannel").handle("serviceHandlerOne", 
"handle").nullChannel();
}
@Bean
public IntegrationFlow parallelFlowOne() throws ParserConfigurationException {
return IntegrationFlows.from("executorChannelOne").handle("parallelServiceHandlerOne", 
"handle").nullChannel();
}
@Bean
public IntegrationFlow parallelFlowTwo() throws ParserConfigurationException {
return IntegrationFlows.from("executorChannelTwo").handle("parallelServiceHandlerTwo", 
"handle").nullChannel();
}
@Bean
public MessageChannel executorChannelOne() {
return new ExecutorChannel(
Executors.newFixedThreadPool(10));
}
@Bean
public MessageChannel executorChannelTwo;() {
return new ExecutorChannel(
Executors.newFixedThreadPool(10));
}
@Component
@Scope("prototype")
public class ServiceHandlerOne{
@Autowired
MessageChannel executorChannelOne;

@ServiceActivator
public Message<?> handle(Message<?> message) {
List<?> rowDatas = repository.findAll("parties");
rowDatas.stream().forEach(data -> {
Message<?> message = MessageBuilder.withPayload(data).build();
executorChannelOne.send(message);
});
return message;
}
}
@Component
@Scope("prototype")
public class ParallelServiceHandlerOne{
@Autowired
MessageChannel executorChannelTwo;;

@ServiceActivator
public Message<?> handle(Message<?> message) {
List<?> rowDatas = repository.findAll("party");
rowDatas.stream().forEach(data -> {
Message<?> message = MessageBuilder.withPayload(data).build();
executorChannelTwo;.send(message);
});
return message;
}
}

首先没有理由使您的服务@Scope("prototype"):我没有看到您的服务中有任何状态,因此它们是无状态的,因此可以简单地singleton。第二:由于你使你的流以nullChannel()结尾,因此从你的服务方法中返回任何内容。因此,只要void,流动就会自然地结束。

另一个观察结果:您直接在服务方法的代码中使用executorChannelOne.send(message)。如果您只是从服务方法返回该新消息并将该executorChannelOne作为该handle("parallelServiceHandlerOne", "handle")之后流定义中的下一个.channel(),则可以简单地实现相同的目标。

由于看起来您在循环中这样做,因此您可以考虑在两者之间添加一个.split():处理程序返回您的List<?> rowDatas,拆分器将负责迭代该数据并将每个项目回复该executorChannelOne

现在谈谈你最初的问题。

真的不容易说你的遗嘱执行人不再忙了。它们可能不是在请求时,只是因为任务的消息尚未到达执行程序通道。

通常,我们建议对数据使用一些异步同步器。聚合器是在传输中关联多条消息的好方法。这样,聚合器就会收集一个组,并且在该组完成之前不会发出回复。

我上面提到的拆分器默认添加序列详细信息标头,因此后续聚合器可以轻松跟踪消息组。

由于流中有层,因此看起来需要多个聚合器:拆分后两个用于执行器通道,一个用于文件的顶层。这两个将回复最终每个文件分组的顶级。

您也可以考虑使用PublishSubscribeChannel并行进行这些partiesparty调用,也可以配置applySequence=true。然后,顶级聚合器将使用此信息来获取每个文件的信息。

在文档中查看更多信息:

https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations-publishsubscribechannel

https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#splitter

https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator

最新更新