本地删除后未使用Spring Integration下载FTP文件



我们正在编写一个Batch Job,它从FTP接收一个文件作为输入,生成一些新文件并将它们写入S3存储桶,为此我们使用Spring Integration。

FTP中的文件是从数据库中提取的,每晚都会更新。

问题是,当我们第一次启动应用程序时,它会很好地连接到FTP,下载文件,并上传生成结果S3。然后我们在本地删除下载的文件,并等待FTP中的下一代文件重新启动进程。但它再也不会下载该文件。

知道吗?

@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(ftpReader(),
spec -> spec.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(period)))
.enrichHeaders(Map.of("CORRELATION_ID", "rcm"))
.aggregate(aggregatorSpec -> aggregatorSpec
.correlationStrategy(message -> message.getHeaders().get("CORRELATION_ID"))
.releaseStrategy(group -> group.getMessages().size() == 2))
.transform(stockUnmarshaller)
.transform(stockTransformer)
.transform(stockMarshaller)
.transform(picturesDownloader)
.transform(picturesZipper)
.transform(stockIndexer)
.handle(directoryCleaner)
.nullChannel();
}
@Bean
public FtpInboundChannelAdapterSpec ftpReader() {
return Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(rootFolder)
.autoCreateLocalDirectory(true)
.localDirectory(new File(localDirectory));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory();
sessionFactory.setHost(host);
sessionFactory.setUsername(userName);
sessionFactory.setPassword(password);
sessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
return sessionFactory;
}

提前谢谢。

编辑:

如果正好有两个文件,我使用enrichHeaders来确保管道被触发。也许标题没有被删除,并且条件总是大于2?也许这是错误的做法?

再次感谢。

听起来像是在谈论同一个文件。在这种情况下,从本地目录中删除它是不够的。进程中涉及一些FileListFilter实例,这些实例包含已处理文件的条目。根据您的配置,您可以处理内存中的变体。他们真的对你的本地文件删除一无所知。

确切地说,您需要担心两个过滤器:FtpPersistentAcceptOnceFileListFilter用于远程条目,FileSystemPersistentAcceptOnceFileListFilter用于文件的本地副本。它们都实现了ResettableFileListFilter,所以,无论何时处理完文件进程,都可以调用它们的remove()

Java DSL中的FtpInboundChannelAdapterSpec具有以下选项:

/**
* Configure a {@link FileListFilter} to be applied to the remote files before
* copying them.
* @param filter the filter.
* @return the spec.
*/
public S filter(FileListFilter<F> filter) {
/**
* A {@link FileListFilter} used to determine which files will generate messages
* after they have been synchronized.
* @param localFileListFilter the localFileListFilter.
* @return the spec.
* @see AbstractInboundFileSynchronizingMessageSource#setLocalFilter(FileListFilter)
*/
public S localFilter(FileListFilter<File> localFileListFilter) {

因此,您仍然可以将这些提到的过滤器作为默认过滤器,但您可以将它们提取为bean,并注入到这些选项和directoryCleaner中,以执行从这些过滤器中删除的操作。

还有一个选项,如:

/**
* Switch the local {@link FileReadingMessageSource} to use its internal
* {@code FileReadingMessageSource.WatchServiceDirectoryScanner}.
* @param useWatchService the {@code boolean} flag to switch to
* {@code FileReadingMessageSource.WatchServiceDirectoryScanner} on {@code true}.
* @since 5.0
*/
public void setUseWatchService(boolean useWatchService) {

并为观察者配置了DELETE事件。发生这种情况时,已删除的文件也会从本地筛选器中删除。

当您配置时,您也可以正确处理远程文件

/**
* Set to true to enable the preservation of the remote file timestamp when transferring.
* @param preserveTimestamp true to preserve.
* @return the spec.
*/
public S preserveTimestamp(boolean preserveTimestamp) {

这样,具有相同名称的较新文件将被视为不同的文件,并且其在上述过滤器中的条目将被覆盖。虽然我看到你已经用过了,但你仍然抱怨它不起作用。当FileSystemPersistentAcceptOnceFileListFilter不用于本地文件时,某些旧版本的Spring Integration可能会出现这种情况。

入站通道适配器有两个筛选器.filter.localFilter

第一个在下载之前过滤远程文件,第二个在文件系统上过滤文件。

默认情况下,filter是一个FtpPersistentAcceptOnceFileListFilter,它只会获取新的或更改的文件。

默认情况下,localFilterFileSystemPersistentAcceptOnceFileListFilter,同样,它只会在文件的时间戳发生更改时第二次传递文件。

因此,只有当文件的时间戳发生更改时,才会重新处理该文件。

我建议你在调试器中运行,看看它为什么没有通过过滤器。

最新更新