是否可以将Spring Integration用于带有ExecutorChannel的批处理过程或带有任务执行器的轮询器



我正在尝试将Spring Integration用于批处理过程。有些步骤很耗时,因此使用QueueChannel会受益,因为QueueChannel有多个使用者,每个使用者都在一个单独的线程上运行。

这种方法的问题在于,在消耗完所有消息之后,没有干净的方法来关闭应用程序。我尝试过使用控制总线并关闭任务执行器,但只有当你能猜测到什么时候所有消息都会被消耗掉,并且没有一条消息在飞行时,这才有效,这是不可能的。

有没有一种干净的方法可以为批处理过程做到这一点,或者这只是在中使用Spring Integration的错误用例?

编辑:

从本质上讲,如果有一种方法让我发送一条特殊的消息,表示所有spring集成组件自动执行的生命周期事件之一,如启动或停止,那就太好了。通过这种方式,停止消息可以保证到达最后,并且如果有方法在停止消息到达生命周期感知bean时触发shutdown((。

在所有消息消耗完后关闭应用程序

如果您知道一开始的消息数量,那么您可以有一个AtomicIntegerbean,并在每次处理消息时递增它。另外,您有一个入站通道适配器来轮询AtomicInteger的当前状态,并决定关闭您的应用程序。

或者,您可以使用聚合器来收集所有消息的结果,并在聚合器之后关闭下游流中的应用程序。

最新更新