我们正在编写一个Batch Job,它从FTP接收一个文件作为输入,生成一些新文件并将它们写入S3存储桶,为此我们使用Spring Integration。
FTP中的文件是从数据库中提取的,每晚都会更新。
问题是,当我们第一次启动应用程序时,它会很好地连接到FTP,下载文件,并上传生成结果S3。然后我们在本地删除下载的文件,并等待FTP中的下一代文件重新启动进程。但它再也不会下载该文件。
知道吗?
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(ftpReader(),
spec -> spec.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(period)))
.enrichHeaders(Map.of("CORRELATION_ID", "rcm"))
.aggregate(aggregatorSpec -> aggregatorSpec
.correlationStrategy(message -> message.getHeaders().get("CORRELATION_ID"))
.releaseStrategy(group -> group.getMessages().size() == 2))
.transform(stockUnmarshaller)
.transform(stockTransformer)
.transform(stockMarshaller)
.transform(picturesDownloader)
.transform(picturesZipper)
.transform(stockIndexer)
.handle(directoryCleaner)
.nullChannel();
}
@Bean
public FtpInboundChannelAdapterSpec ftpReader() {
return Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(rootFolder)
.autoCreateLocalDirectory(true)
.localDirectory(new File(localDirectory));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory();
sessionFactory.setHost(host);
sessionFactory.setUsername(userName);
sessionFactory.setPassword(password);
sessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
return sessionFactory;
}
提前谢谢。
编辑:
如果正好有两个文件,我使用enrichHeaders
来确保管道被触发。也许标题没有被删除,并且条件总是大于2?也许这是错误的做法?
再次感谢。
听起来像是在谈论同一个文件。在这种情况下,从本地目录中删除它是不够的。进程中涉及一些FileListFilter
实例,这些实例包含已处理文件的条目。根据您的配置,您可以处理内存中的变体。他们真的对你的本地文件删除一无所知。
确切地说,您需要担心两个过滤器:FtpPersistentAcceptOnceFileListFilter
用于远程条目,FileSystemPersistentAcceptOnceFileListFilter
用于文件的本地副本。它们都实现了ResettableFileListFilter
,所以,无论何时处理完文件进程,都可以调用它们的remove()
。
Java DSL中的FtpInboundChannelAdapterSpec
具有以下选项:
/**
* Configure a {@link FileListFilter} to be applied to the remote files before
* copying them.
* @param filter the filter.
* @return the spec.
*/
public S filter(FileListFilter<F> filter) {
/**
* A {@link FileListFilter} used to determine which files will generate messages
* after they have been synchronized.
* @param localFileListFilter the localFileListFilter.
* @return the spec.
* @see AbstractInboundFileSynchronizingMessageSource#setLocalFilter(FileListFilter)
*/
public S localFilter(FileListFilter<File> localFileListFilter) {
因此,您仍然可以将这些提到的过滤器作为默认过滤器,但您可以将它们提取为bean,并注入到这些选项和directoryCleaner
中,以执行从这些过滤器中删除的操作。
还有一个选项,如:
/**
* Switch the local {@link FileReadingMessageSource} to use its internal
* {@code FileReadingMessageSource.WatchServiceDirectoryScanner}.
* @param useWatchService the {@code boolean} flag to switch to
* {@code FileReadingMessageSource.WatchServiceDirectoryScanner} on {@code true}.
* @since 5.0
*/
public void setUseWatchService(boolean useWatchService) {
并为观察者配置了DELETE
事件。发生这种情况时,已删除的文件也会从本地筛选器中删除。
当您配置时,您也可以正确处理远程文件
/**
* Set to true to enable the preservation of the remote file timestamp when transferring.
* @param preserveTimestamp true to preserve.
* @return the spec.
*/
public S preserveTimestamp(boolean preserveTimestamp) {
这样,具有相同名称的较新文件将被视为不同的文件,并且其在上述过滤器中的条目将被覆盖。虽然我看到你已经用过了,但你仍然抱怨它不起作用。当FileSystemPersistentAcceptOnceFileListFilter
不用于本地文件时,某些旧版本的Spring Integration可能会出现这种情况。
入站通道适配器有两个筛选器.filter
和.localFilter
。
第一个在下载之前过滤远程文件,第二个在文件系统上过滤文件。
默认情况下,filter
是一个FtpPersistentAcceptOnceFileListFilter
,它只会获取新的或更改的文件。
默认情况下,localFilter
是FileSystemPersistentAcceptOnceFileListFilter
,同样,它只会在文件的时间戳发生更改时第二次传递文件。
因此,只有当文件的时间戳发生更改时,才会重新处理该文件。
我建议你在调试器中运行,看看它为什么没有通过过滤器。