Spring Integration-SFTP Polling-Inbound Adapter,用于获取文件名而不复制到



上下文

我们使用spring集成流来轮询输入目录中的新文件。文件名被传递给现有的spring批处理作业,该作业获取文件并处理业务处理。

return IntegrationFlows.from(
Files.inboundAdapter(new File(properties.getInputDir())).filter(new AcceptOnceFileListFilter<>()),
c -> c.poller(Pollers.fixedRate(300, TimeUnit.SECONDS).maxMessagesPerPoll(50)))
.log(LoggingHandler.Level.INFO, "Inb_GW_Msg", Message::getPayload)
.channel(c -> c.executor(jobsTaskExecutor))
.transform(fileMessageToJobRequest)
.handle(jobLaunchingGateway, e -> e.advice(jobExecutionAdvice()))
.log(LoggingHandler.Level.INFO, "Inb_GW_Msg_Processing_Result")
.get();

转换器将输入文件路径设置为作业参数

@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName, message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}

我们正在尝试修改流,以使用远程SFTP服务器作为文件源。spring批处理作业使用出站spring集成网关,使用基于此处提供的建议的路径获取远程文件。spring integration-并行访问SFTP出站网关GET w/STREAM并访问队列通道的响应,效果良好。

问题

在试图修改入站轮询器以从远程sftp服务器获取文件名时,我们在试图找到合适的入站适配器时遇到了问题。

Sftp.inboundAdapter(SessionFactory<ChannelSftp.LsEntry> sessionFactory)的使用建立了一个SftpInboundFileSynchronizingMessageSource,它将远程文件系统同步到本地目录。这种行为不符合我们的需求,我们正在寻找一个远程轮询器,它只会获取与本地文件系统轮询器类似的文件名。

我知道SFTP出站网关支持"ls"操作,解决轮询需求的一种方法是实现一个自定义消息源/轮询器,它定期连接到出站网关以获取目录列表。

是否有现成的适配器支持此要求?

在不编写大量自定义代码和/或修改现有春季批处理作业的情况下,寻求更简单的方法来满足我们的轮询需求的建议。

更新

看起来Sftp.inboundStreamingAdapter设置的SftpStreamingMessageSource将返回远程文件元数据以及输入流。我可以简单地立即丢弃/关闭流,并使用元数据来启动春季批处理作业。

我会尝试一下并发布更新。感谢对此设计理念的反馈。

使用带有list(LS(命令的出站网关。

https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#using-ls命令

编辑

@Bean
IntegrationFlow flow(DefaultSftpSessionFactory sf) {
return IntegrationFlows.fromSupplier(() -> "dir", e -> e.poller(Pollers.fixedDelay(5000)))
.handle(Sftp.outboundGateway(sf, Command.LS, "payload")
.options(Option.NAME_ONLY))
.split()
.log()
.get();
}