我目前有一个spring集成(v4.3.24(流程,如下所示:
|
| list of
| filepaths
+----v---+
|splitter|
+----+---+
| filepath
|
+----------v----------+
|sftp-outbound-gateway|
| "get" |
+----------+----------+
| file
+---------------------+
| +----v----+ |
| |decryptor| |
| +----+----+ |
| | |
| +-----v------+ | set of transformers
| |decompressor| | (with routers before them
| +-----+------+ | because some steps are optional)
| | | that process the file;
| +--v--+ | call this "FileProcessor"
| | ... | |
| +--+--+ |
+---------------------+
|
+----v----+
|save file|
| to disk |
+----+----+
|
上面所有的通道都是DirectChannel
s-是的,我知道这是一个糟糕的结构。这对于少量文件来说效果很好。但现在,我必须处理数千个需要经过相同流程的文件——基准测试显示,这需要大约1天的时间才能完成处理。因此,我计划在这个流程中引入一些并行处理。我想修改我的流程以实现这样的目标:
|
|
+----------v----------+
|sftp-outbound-gateway|
| "mget" |
+----------+----------+
| list of files
|
+----v---+
|splitter|
+----+---+
one thread one | thread ...
+------------------------+---------------+--+--+--+--+
| file | file | | | | |
+---------------------+ +---------------------+
| +----v----+ | | +----v----+ |
| |decryptor| | | |decryptor| |
| +----+----+ | | +----+----+ |
| | | | | |
| +-----v------+ | | +-----v------+ | ...
| |decompressor| | | |decompressor| |
| +-----+------+ | | +-----+------+ |
| | | | | |
| +--v--+ | | +--v--+ |
| | ... | | | | ... | |
| +--+--+ | | +--+--+ |
+---------------------+ +---------------------+
| |
+----v----+ +----v----+
|save file| |save file|
| to disk | | to disk |
+----+----+ +----+----+
| |
| |
对于并行处理,我将拆分器上的文件输出到带有ThreadPoolTaskExecutor
的ExecutorChannel
。
我有一些问题:
我想要所有的";文件处理器";一个文件发生在同一个线程上,同时并行处理多个文件的步骤。我怎样才能做到这一点
我从这个答案中看到,ExecutorChannel
到MessageHandlerChain
流将提供这样的功能。但是,里面的一些步骤";文件处理器";是可选的(使用带有路由器的selector-expression
跳过某些步骤(-排除了使用MessageHandlerChain
的可能性。我可以在里面安装几个MessageHandlerChain
和Filter
,但这或多或少成为了#2中提到的方法。如果不能实现#1,那么将从分离器开始的所有通道类型从
DirectChannel
更改为ExecutorChannel
是否有助于引入一些并行性?如果是,我应该为每个通道创建一个新的TaskExecutor
,还是可以为所有通道重用一个TaskExecutor
bean(我不能在TaskExecutor
bean上设置scope="prototype"
(?在你看来,哪种方法(#1或#2(更好?为什么?
如果我执行全局错误处理,就像这里提到的方法一样,即使一个文件出错,其他文件也会继续处理吗?
它将根据您的需要使用ExecutorChannel
作为解密器的输入,并将所有其他通道保留为直接通道;剩下的流不一定是一个链,每个组件都将在执行器的一个线程上运行。
您需要确保所有下游组件都是线程安全的。
错误处理应保持原样;每个子流是独立的。