如何使用入站通道适配器处理不同队列中的多个文件



我正在使用Spring 4.3.8和Spring Integration 4.3.9。我希望同时处理具有不同队列的多个文件。例如,有一些文件名为 A001、A002、A003、B001、B002、B003 的文件。我想异步处理 Axxx 和 Bxxx。在完成处理 A001 和 B001 后,将处理 A002 和 B002。队列 A 和队列 B 不会相互影响,这意味着无论 B002 是否完成,都可以在 A002 完成后处理 A003。我不知道该怎么做。

我所做的如下所示。

<int-file:inbound-channel-adapter id="inFiles" directory="${integration.input.folder.path}" filename-regex="^A.*.dat$|^B.*.tsv$"
comparator="fileOrderComparator"
prevent-duplicates="false" >
<int:poller fixed-delay="10000" task-executor="taskExecutor" max-messages-per-poll="5" />
</int-file:inbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="8" />
<int-file:outbound-gateway id="inFileProcessingGateway" directory="${integration.processing.folder.path}" request-channel="inFiles"
reply-channel="processFile" auto-create-directory="true" delete-source-files="true" />

但它会异步处理 A001、A002、B001、B002,这不是我所期望的。

要实现此目标(尽管尚不清楚出于什么原因),您肯定需要根据并行性要求使用max-messages-per-poll。收到每组后,您需要停止通道适配器。您可以使用AbstractMessageSourceAdvice:https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/messaging-channels-section.html#conditional-pollers 来执行此操作。在执行所有这些任务之后,收集它们以确定您准备好轮询下一组文件的时刻会很棒。并再次启动文件读取通道适配器。为此,您可以使用聚合器:https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/messaging-routing-chapter.html#aggregator

更新

因为我不希望 Axxx 和 Bxxx 的进程队列相互影响。也许 A002 没有完成,但 B002 完成了,B003 可以先开始处理,无需等待 A002 完成。这可能吗?

是的,这是一个有点不同的场景,可以通过非常简单的步骤来完成:

  1. 您不会在<int-file:inbound-channel-adapter>上使用task-executor
  2. 您可以使用Router作为<int-file:inbound-channel-adapter>输出通道的订户。在此路由器中可以检查有效负载的AB模式。或者对适当的FileHeaders.FILENAME标头执行相同的操作。因此,将A路由到一个通道,B路由到另一个通道。
  3. 由于您无法并行处理多个A,因此确实需要为这两个路由器的订阅者使用QueueChannel。不应为它们的轮询使用者配置ExecutorChannel以避免类似A文件的并发任务。B文件将从其自己的队列中轮询。这就是您将如何实现A并行性,并在单个队列中B支持顺序过程。

最新更新