使用Spring Integration动态创建SFTP读取器



我的多租户应用程序需要定期检查一组SFTP文件夹中的接收文件。SFTP文件夹和参数在我的application.properties文件中定义,编号是动态的。

ingestion.carriers=carrier1,carrier2
ingestion.carrier1=sftp
ingestion.carrier1.sftp.host=localhost
ingestion.carrier1.sftp.username=sftp
ingestion.carrier1.sftp.password=sftp
ingestion.carrier1.sftp.remotedir=carrier1
ingestion.carrier1.sftp.localdir=sftp/carrier1
ingestion.carrier1.sftp.archivedir=sftp/carrier1/archive
ingestion.carrier1.sftp.errordir=sftp/carrier1/error
ingestion.carrier1.ping=7000
ingestion.carrier2=sftp
ingestion.carrier2.sftp.host=localhost
ingestion.carrier2.sftp.username=sftp
ingestion.carrier2.sftp.password=sftp
ingestion.carrier2.sftp.remotedir=carrier2
ingestion.carrier2.sftp.localdir=sftp/carrier2
ingestion.carrier2.sftp.archivedir=sftp/carrier2/archive
ingestion.carrier2.sftp.errordir=sftp/carrier2/error
ingestion.carrier2.pingFrequency=13000

我需要创建所有必要的bean来实现spring集成流。为此,我尝试设置BeanFactoryPostProcessor,因为我无法在";静态";方法该处理器被认为在未来会配置不同的方法来检索文件:因此,bean的实际创建被委托给另一个类。

这是后处理器。。。

package mypkg.batch.config.integration;
import mypkg.batch.config.integration.factory.SFTPBeansFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class IngestionBeanConfigurator implements BeanFactoryPostProcessor {
public static final Logger LOG = LoggerFactory.getLogger(IngestionBeanConfigurator.class);
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory fact) throws BeansException {
Environment env = fact.getBean(Environment.class);
String carriersList = env.getProperty("ingestion.carriers");
if (carriersList == null) {
LOG.info("No ingestion has been defined");
return;
}
List<String> carriers = List.of(carriersList.split(","));
for (String carrier : carriers) {
String carrierMethod = env.getProperty("ingestion.%s".formatted(carrier));
if (carrierMethod != null) {
if ("sftp".equals(carrierMethod)) {
new SFTPBeansFactory(carrier, env).loadBeans(fact);
} else {
LOG.warn("Invalid carrier method {} for carrier {}", carrierMethod, carrier);
}
}
}
}
}

这是创建SFTPbeans 的类

package com.eyemed.foodogs.batch.config.integration.factory;
import com.eyemed.foodogs.model.exception.MembersMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.Environment;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannelSpec;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.dsl.StandardIntegrationFlow;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import java.io.File;
import java.math.BigInteger;
import java.util.concurrent.TimeUnit;
public class SFTPBeansFactory implements BeanFactory {
public static final Logger LOG = LoggerFactory.getLogger(SFTPBeansFactory.class);
private final String carrierId;
private final String sftpHost;
private final String sftpUsername;
private final String sftpPassword;
private final String sftpRemoteDir;
private final String sftpLocalDir;
private final String sftpArchiveDir;
private final String sftpErrorDir;
private final BigInteger pingFrequency;
public SFTPBeansFactory(final String carrierId, final Environment props) {
String prefix = "ingestion.%s".formatted(carrierId);
this.carrierId = carrierId;
this.sftpHost = props.getProperty("%s.sftp.host".formatted(prefix));
this.sftpUsername = props.getProperty("%s.sftp.username".formatted(prefix));
this.sftpPassword = props.getProperty("%s.sftp.password".formatted(prefix));
this.sftpRemoteDir = props.getProperty("%s.sftp.remotedir".formatted(prefix));
this.sftpLocalDir = props.getProperty("%s.sftp.localdir".formatted(prefix));
this.sftpArchiveDir = props.getProperty("%s.sftp.archivedir".formatted(prefix));
this.sftpErrorDir = props.getProperty("%s.sftp.errordir".formatted(prefix));
String pingFrequencyString = props.getProperty("%s.ping".formatted(prefix));
if (pingFrequencyString != null) {
this.pingFrequency = new BigInteger(pingFrequencyString);
} else {
this.pingFrequency = BigInteger.valueOf(3600000);
}
}
public void loadBeans(ConfigurableBeanFactory fact) {
DefaultSftpSessionFactory sf = _buildSessionFactory();
SftpInboundFileSynchronizer sync = _buildInboundFileSynchronizer(sf);
fact.registerSingleton("sftp-sync-%s".formatted(carrierId), sync);
SftpInboundFileSynchronizingMessageSource src = _buildMessageSource(sync);
MembersMessageHandler handler = new MembersMessageHandler(carrierId, fact.getBean(JobLauncher.class), fact.getBean("readMembersJob", Job.class));
String beanName = "sftp-flow-%s".formatted(carrierId);
String channelName = "sftp-ingestion-channel-%s".formatted(carrierId);
LOG.info("Creating bean %s based on channel %s".formatted(beanName, channelName));
StandardIntegrationFlow flow = IntegrationFlows
.from(src, c -> c.poller(Pollers.fixedRate(pingFrequency.longValue(), TimeUnit.MILLISECONDS, 0)))
.channel(channelName)
.handle(handler)
.get();
IntegrationFlowContext ctx = fact.getBean(IntegrationFlowContext.class);
ctx.registration(flow).id(beanName).autoStartup(true).register();
flow.start();
}
private SftpInboundFileSynchronizingMessageSource _buildMessageSource(SftpInboundFileSynchronizer sync) {
var src = new SftpInboundFileSynchronizingMessageSource(sync);
src.setLocalDirectory(new File(sftpLocalDir));
src.setAutoCreateLocalDirectory(true);
src.setLocalFilter(new AcceptOnceFileListFilter<>());
return src;
}
private SftpInboundFileSynchronizer _buildInboundFileSynchronizer(DefaultSftpSessionFactory sf) {
var sync = new SftpInboundFileSynchronizer(sf);
sync.setDeleteRemoteFiles(true);
sync.setRemoteDirectory(sftpRemoteDir);
sync.setFilter(new SftpSimplePatternFileListFilter("*.csv"));
sync.setLocalFilenameGeneratorExpressionString(
"#this.substring(0, #this.length - 4) + '_%s_' + new com.eyemed.foodogs.application.util.TimestampProvider().currentTimestamp() + '.txt'".formatted(carrierId));
return sync;
}
private DefaultSftpSessionFactory _buildSessionFactory() {
var sf = new DefaultSftpSessionFactory();
sf.setHost(sftpHost);
sf.setUser(sftpUsername);
sf.setPassword(sftpPassword);
sf.setPort(22);
sf.setAllowUnknownKeys(true);
return sf;
}
}

不幸的是,这似乎不起作用:SFTP文件没有被读取,并且遗憾地保留在源文件夹中。本地SFTP与先前版本一样;静态";豆子过去能正常工作。

此外,我在日志中没有看到错误

.   ____          _            __ _ _
/\ / ___'_ __ _ _(_)_ __  __ _    
( ( )___ | '_ | '_| | '_ / _` |    
\/  ___)| |_)| | | | | || (_| |  ) ) ) )
'  |____| .__|_| |_|_| |___, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot ::                (v2.5.5)
2022-10-18 11:42:15.396  INFO 8226 --- [           main] com.eyemed.foodogs.application.App       : Starting App using Java 17 on A-M-L-MUTI.local with PID 8226 (/Users/lorenzomuti/Repositories/FooDogs/backend/foodogsbootapplication/target/classes started by lorenzomuti in /Users/lorenzomuti/Repositories/FooDogs/backend/foodogsbootapplication)
2022-10-18 11:42:15.399  INFO 8226 --- [           main] com.eyemed.foodogs.application.App       : The following profiles are active: dev
2022-10-18 11:42:17.988  INFO 8226 --- [           main] c.e.f.b.c.i.factory.SFTPBeansFactory     : Creating bean sftp-flow-carrier1 based on channel sftp-ingestion-channel-carrier1
2022-10-18 11:42:18.028  INFO 8226 --- [           main] c.e.f.b.c.i.factory.SFTPBeansFactory     : Creating bean sftp-flow-carrier2 based on channel sftp-ingestion-channel-carrier2
2022-10-18 11:42:18.038  INFO 8226 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2022-10-18 11:42:18.049  INFO 8226 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2022-10-18 11:42:18.311  INFO 8226 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2022-10-18 11:42:18.322  INFO 8226 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2022-10-18 11:42:18.324  INFO 8226 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2022-10-18 11:42:18.659  INFO 8226 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)
2022-10-18 11:42:18.675  INFO 8226 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2022-10-18 11:42:18.676  INFO 8226 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.53]
2022-10-18 11:42:18.816  INFO 8226 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2022-10-18 11:42:18.816  INFO 8226 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 3337 ms
2022-10-18 11:42:18.892 DEBUG 8226 --- [           main] o.s.w.f.CommonsRequestLoggingFilter      : Filter 'logFilter' configured for use
2022-10-18 11:42:19.474  INFO 8226 --- [           main] c.e.f.application.config.UnionPayConfig  : Activating UnionPay Service logger
2022-10-18 11:42:20.478  INFO 8226 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: MYSQL
2022-10-18 11:42:20.501  INFO 8226 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2022-10-18 11:42:20.730  INFO 8226 --- [           main] o.s.s.web.DefaultSecurityFilterChain     : Will secure any request with [org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter@7a0f06ad, org.springframework.security.web.context.SecurityContextPersistenceFilter@3e9fb485, org.springframework.security.web.header.HeaderWriterFilter@580ffea, org.springframework.security.web.authentication.logout.LogoutFilter@7fe87c0e, org.springframework.security.web.authentication.www.BasicAuthenticationFilter@c82d925, org.springframework.security.web.savedrequest.RequestCacheAwareFilter@38dbeb39, org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter@48106381, org.springframework.security.web.authentication.AnonymousAuthenticationFilter@1fa9692b, org.springframework.security.web.session.SessionManagementFilter@2ffb0d10, org.springframework.security.web.access.ExceptionTranslationFilter@f76872f, org.springframework.security.web.access.intercept.FilterSecurityInterceptor@6df2a206]
2022-10-18 11:42:20.849  INFO 8226 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2022-10-18 11:42:20.849  INFO 8226 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2022-10-18 11:42:20.849  INFO 8226 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2022-10-18 11:42:20.867  INFO 8226 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2022-10-18 11:42:20.887  INFO 8226 --- [           main] com.eyemed.foodogs.application.App       : Started App in 6.399 seconds (JVM running for 7.221)

我的方法正确吗?我应该多做点什么吗?请帮帮我,因为我不知道该往哪里撞头:(

考虑将您的逻辑转移到某个@PostConstruct方法中。

我认为访问bean工厂并从BeanFactoryPostProcessor启动bean还为时过早。

也可以考虑使用SftpInboundChannelAdapterSpec而不是手动创建。并且不要手动注册这些bean——依赖于IntegrationFlowContext

我想建议你也调查一下:https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#sftp-轮换服务器建议。

但是看起来您在消息处理程序中使用了carrierId。尽管它可能是一个消息头。也不确定您是否真的需要介于两者之间的.channel(channelName)

最新更新