TaskExecutor未在Spring Integration中工作



我已经设置了带有任务执行器的文件轮询器

ExecutorService executorService = Executors.newFixedThreadPool(10);
LOG.info("Setting up the poller for directory {} ", finalDirectory);
StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
.taskExecutor(executorService)
.maxMessagesPerPoll(10)
.advice(new LoggerSourceAdvisor(finalDirectory))
))

//move file to processing first processing                    
.transform(new FileMoveTransformer("C:/processing", true))
.channel("fileRouter")
.get();

如图所示,我已经设置了10的固定threadpool和每次轮询的最大消息10。如果我放入10个文件,它仍然会逐一处理。这里可能出了什么问题?

*更新*

虽然我现在有其他问题,但在加里回答后,它运行得非常好。

我已经像这个一样设置了我的轮询器

setDirectory(new File(path));
DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();
scanner.setFilter(new AcceptAllFileListFilter<>());
setScanner(scanner);

使用AcceptAll的原因是同一个文件可能会再次出现,这就是为什么我会先移动文件。但是,当我启用线程执行器时,多个线程正在处理同一个文件,我认为这是因为AcceptAllFile

如果我更改为AcceptOnceFileListFilter,它可以工作,但再次出现的同一文件将不会再次被拾取!如何避免这个问题?

问题/错误

AbstractPersistentAcceptOnceFileListFilter类中,我们有这个代码

@Override
public boolean accept(F file) {
String key = buildKey(file);
synchronized (this.monitor) {
String newValue = value(file);
String oldValue = this.store.putIfAbsent(key, newValue);
if (oldValue == null) { // not in store
flushIfNeeded();
return true;
}
// same value in store
if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
flushIfNeeded();
return true;
}
return false;
}
}

例如,如果我有setup max per poll 5,并且有两个文件,那么它可能的相同文件将由两个线程获取。

假设我的代码在读取后会移动文件。

但另一个线程到达accept方法

如果文件不在那里,那么它将返回lastModified时间为0,并返回true。

这导致了问题,因为文件不在那里。

如果它为0,那么它应该返回false,因为文件已经不在了。

将任务执行器添加到轮询器时;所做的只是调度器线程将轮询任务交给线程池中的线程;CCD_ 7是轮询任务的一部分。轮询器本身每5秒只运行一次。要获得您想要的内容,您应该向流添加一个执行器通道。。。

@SpringBootApplication
public class So53521593Application {
private static final Logger logger = LoggerFactory.getLogger(So53521593Application.class);
public static void main(String[] args) {
SpringApplication.run(So53521593Application.class, args);
}
@Bean
public IntegrationFlow flow() {
ExecutorService exec = Executors.newFixedThreadPool(10);
return IntegrationFlows.from(() -> "foo", e -> e
.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
.maxMessagesPerPoll(10)))
.channel(MessageChannels.executor(exec))
.<String>handle((p, h) -> {
try {
logger.info(p);
Thread.sleep(10_000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return null;
})
.get();
}
}

编辑

它对我来说很好…

@Bean
public IntegrationFlow flow() {
ExecutorService exec = Executors.newFixedThreadPool(10);
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
.maxMessagesPerPoll(10)))
.channel(MessageChannels.executor(exec))
.handle((p, h) -> {
try {
logger.info(p.toString());
Thread.sleep(10_000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return null;
})
.get();
}

2018-11-28 11:46:05.196信息57607---[pool-1-thread-1]com.example.So53521593应用程序:/tmp/foo/test1.txt

2018-11-28 11:46:05.197信息57607---[pool-1-thread-2]com.example.So53521593应用程序:/tmp/foo/test2.txt

touch test1.txt

2018-11-28 11:48:00.284信息57607---[pool-1-thread-3]com.example.So53521593应用程序:/tmp/foo/test1.txt

EDIT1

同意-转载于此。。。

@Bean
public IntegrationFlow flow() {
ExecutorService exec = Executors.newFixedThreadPool(10);
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
.maxMessagesPerPoll(10)))
.channel(MessageChannels.executor(exec))
.<File>handle((p, h) -> {
try {
p.delete();
logger.info(p.toString());
Thread.sleep(10_000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return null;
})
.get();
}

2018-11-28 13:22:23.689信息75681---[pool-1-thread-1]com.example.So53521593应用程序:/tmp/foo/test1.txt

2018-11-28 13:22:23.690信息75681---〔pool-1-thread-2〕com.example.So53521593应用程序:/tmp/foo/test2.txt

2018-11-28 13:22:23.690信息75681--[pool-1-thread-3]com.example.So53521593应用程序:/tmp/foo/test1.txt

2018-11-28 13:22:23.690信息75681--[pool-1-thread-4]com.example.So53521593应用程序:/tmp/foo/test2.txt

相关内容

  • 没有找到相关文章

最新更新