如何在单个线程中执行一系列步骤,并在spring集成中使用异步流



我目前有一个spring集成(v4.3.24(流程,如下所示:

|
| list of
| filepaths
+----v---+
|splitter|
+----+---+
| filepath
|
+----------v----------+
|sftp-outbound-gateway|
|        "get"        |
+----------+----------+
| file
+---------------------+
|     +----v----+     |
|     |decryptor|     |
|     +----+----+     |
|          |          |
|    +-----v------+   | set of transformers
|    |decompressor|   | (with routers before them
|    +-----+------+   | because some steps are optional)
|          |          | that process the file;
|       +--v--+       | call this "FileProcessor"
|       | ... |       |
|       +--+--+       |
+---------------------+
|
+----v----+
|save file|
| to disk |
+----+----+
|

上面所有的通道都是DirectChannels-是的,我知道这是一个糟糕的结构。这对于少量文件来说效果很好。但现在,我必须处理数千个需要经过相同流程的文件——基准测试显示,这需要大约1天的时间才能完成处理。因此,我计划在这个流程中引入一些并行处理。我想修改我的流程以实现这样的目标:

|
|
+----------v----------+
|sftp-outbound-gateway|
|       "mget"        |
+----------+----------+
| list of files
|
+----v---+
|splitter|
+----+---+
one thread             one | thread        ...
+------------------------+---------------+--+--+--+--+
| file                   | file          |  |  |  |  |
+---------------------+  +---------------------+
|     +----v----+     |  |     +----v----+     |
|     |decryptor|     |  |     |decryptor|     |
|     +----+----+     |  |     +----+----+     |
|          |          |  |          |          |
|    +-----v------+   |  |    +-----v------+   |   ...
|    |decompressor|   |  |    |decompressor|   |
|    +-----+------+   |  |    +-----+------+   |
|          |          |  |          |          |
|       +--v--+       |  |       +--v--+       |
|       | ... |       |  |       | ... |       |
|       +--+--+       |  |       +--+--+       |
+---------------------+  +---------------------+
|                        |
+----v----+              +----v----+
|save file|              |save file|
| to disk |              | to disk |
+----+----+              +----+----+
|                        |
|                        |

对于并行处理,我将拆分器上的文件输出到带有ThreadPoolTaskExecutorExecutorChannel

我有一些问题:

  1. 我想要所有的";文件处理器";一个文件发生在同一个线程上,同时并行处理多个文件的步骤。我怎样才能做到这一点
    我从这个答案中看到,ExecutorChannelMessageHandlerChain流将提供这样的功能。但是,里面的一些步骤";文件处理器";是可选的(使用带有路由器的selector-expression跳过某些步骤(-排除了使用MessageHandlerChain的可能性。我可以在里面安装几个MessageHandlerChainFilter,但这或多或少成为了#2中提到的方法。

  2. 如果不能实现#1,那么将从分离器开始的所有通道类型从DirectChannel更改为ExecutorChannel是否有助于引入一些并行性?如果是,我应该为每个通道创建一个新的TaskExecutor,还是可以为所有通道重用一个TaskExecutorbean(我不能在TaskExecutorbean上设置scope="prototype"(?

  3. 在你看来,哪种方法(#1或#2(更好?为什么?

  4. 如果我执行全局错误处理,就像这里提到的方法一样,即使一个文件出错,其他文件也会继续处理吗?

它将根据您的需要使用ExecutorChannel作为解密器的输入,并将所有其他通道保留为直接通道;剩下的流不一定是一个链,每个组件都将在执行器的一个线程上运行。

您需要确保所有下游组件都是线程安全的。

错误处理应保持原样;每个子流是独立的。

相关内容

最新更新