如何通过Java1.7中的SpringDSL在Sftp Inbound适配器中动态传输消息



我有一个Sftp入站流,我从DefaultSftpSessionFactory获得了会话信息。但我需要动态地实现多个会话信息,这些信息将从数据库表中获得。这意味着我有许多Sftp服务器的详细信息需要在集成流中实现。现在我已经完成了从单个源到单个目的地的文件传输,但我需要实现多个源到多个目的地。因此,任何人都可以提供一些关于这一点的指针。

这是我的会话工厂。。。这里我有一个单一的Sftp服务器信息,但如何配置多个服务器的详细信息。

@Autowired
private DefaultSftpSessionFactory sftpSessionFactory;
@Bean
public DefaultSftpSessionFactory sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(
true);
factory.setHost("111.11.12.143");
factory.setPort(22);
factory.setUser("sftp");
factory.setPassword("*******");         
return factory;
}

这是我的Sftp入站流。。

@Bean
public IntegrationFlow sftpInboundFlow() {
System.out.println("enter sftpInboundFlow....."
+ sftpSessionFactory.getSession());     
return IntegrationFlows
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true).remoteDirectory(remDir)
.regexFilter(".*\.txt$")
.localFilenameExpression("#this.toUpperCase()")
.localDirectory(new File(localDir))
.remoteFileSeparator("/"),
new Consumer<SourcePollingChannelAdapterSpec>() {
@Override
public void accept(SourcePollingChannelAdapterSpec e) {
e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedRate(1000)
.maxMessagesPerPoll(1));
}
})
//.channel(MessageChannels.queue("sftpInboundResultChannel"))
.channel(sftpInboundResultChannel())
.get();
}

按照加里的建议,我正在编辑我的帖子。。。。

嗨,加里,我引用了Github动态FTP的例子。

通过ChannelResolver类,我需要调用上面的DSL类。并在上下文属性中设置动态值,而不使用XML。

在我的ChannelResolver类中,我想要一些类似的东西

StandardEnvironment env = new StandardEnvironment();
Properties props = new Properties();
props.setProperty("inbound.host", host);    //I am getting the value of 'host' from a DB table.
PropertiesPropertySource pps = new PropertiesPropertySource("sftpprop", props);
env.getPropertySources().addLast(pps);
context.setEnvironment(env); 
And my DSL class I need to use like this.
@Value("${inbound.host}")
private String host;
So in this way can I set dynamic value for String 'host' ? 

我正在编辑我的原始帖子。。。。。。。。。。。

In my Outbound dynamic resolver class I am doing like this

StandardEnvironment env = new StandardEnvironment();
Properties props = new Properties();        
props.setProperty("outbound.host", host);
props.setProperty("outbound.port", String.valueOf(port));
props.setProperty("outbound.user", user);
props.setProperty("outbound.password", password);
props.setProperty("outbound.remote.directory", remoteDir);
props.setProperty("outbound.local.directory", localDir);        
PropertiesPropertySource pps = new PropertiesPropertySource("ftpprops", props);
env.getPropertySources().addLast(pps);
ctx.setEnvironment(env);

And this is my dsl class....
@Autowired
private DefaultSftpSessionFactory sftpSessionFactory;
@Bean
public DefaultSftpSessionFactory sftpSessionFactory(@Value("${outbound.host}") String host, @Value("${outbound.port}") int port,
@Value("${outbound.user}") String user, @Value("${outbound.password}") String password
) {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);      
return factory;
}

@Bean
public IntegrationFlow fileInboundFlow(@Value("${outbound.local.directory}") String localDir)
{
return IntegrationFlows
.from(Files.inboundAdapter(new File(localDir)),
new Consumer<SourcePollingChannelAdapterSpec>() {
@Override
public void accept(SourcePollingChannelAdapterSpec e) {
e.autoStartup(true).poller(
Pollers.fixedDelay(5000)
.maxMessagesPerPoll(1));
}
})
.channel(sftpSendChannel())
.get();
}
@Bean
public IntegrationFlow sftpOutboundFlow(@Value("${outbound.remote.directory}") String remDir) {    
return IntegrationFlows
.from(sftpSendChannel())
.handle(Sftp.outboundAdapter(this.sftpSessionFactory)                       
.useTemporaryFileName(false)
.remoteDirectory(remDir))
.get();
}
@Bean
public MessageChannel sftpSendChannel() {
return new DirectChannel();
}
@Bean
public static PropertySourcesPlaceholderConfigurer configurer1() {
return new PropertySourcesPlaceholderConfigurer();      
}

And this the error log from console...

2015年8月3日下午7:50:25 org.apache.catalina.core.StandardContext listenerStart严重:将上下文初始化事件发送到类org.springframework.web.context.ContextLoaderListener的侦听器实例时发生异常org.springframework.beans.factory.BeanCreationException:创建名为"sftpOutBoundDsl"的bean时出错:注入自动连接的依赖项失败;嵌套异常为org.springframework.beans.factory.BeanCreationException:无法自动连接字段:private org.springfframework.integration.sftp.session.DefaultSftpSessionFactory com.tcs.iux.ieg.sftp.dynamic.SftpOutBoundDsl.sftpSessionFactory;嵌套异常为java.lang.IollegalArgumentException:无法解析字符串值"${outbound.host}"中的占位符"outbound.hist"位于org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAannotationBeanpostProcessor.java:334)网址:org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1204)网址:org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:538)网址:org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:476)位于org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:302)位于org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:229)网址:org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory:java:298)网址:org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:193)位于org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:725)位于org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:757)网址:org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:480)网址:org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:403)位于org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:306)位于org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:106)网址:org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4973)网址:org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5467)网址:org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150)网址:org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1559)网址:org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1549)位于java.util.concurrent.FFutureTask.run(FutureTask.java:262)位于java.util.concurrent.ThreadPoolExecutiator.runWorker(ThreadPoolExecutiator.java:1145)位于java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)在java.lang.Thread.run(线程.java:744)由:org.springframework.beans.factory.BeanCreationException引起:无法自动连接字段:private org.springfframework.integration.sftp.session.DefaultSftpSessionFactory com.tcs.iux.ieg.sftp.dynamic.SftpOutBoundDsl.sftpSessionFactory;嵌套异常为java.lang.IollegalArgumentException:无法解析字符串值"${outbound.host}"中的占位符"outbound.hist"位于org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAannotationBeanpostProcessor.java:555)网址:org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:87)位于org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAannotationBeanpostProcessor.java:331)…还有22个由以下原因引起:java.lang.IollegalArgumentException:无法解析字符串值"${outbound.host}"中的占位符"outbound.hhost"位于org.springframework.util.PropertyPlaceholderHelper.parseStringValue(PropertyPlaceholderHelper.java:174)位于org.springframework.util.PropertyPlaceholderHelper.replacePlaceholders(PropertyPlaceholderHelper.java:126)位于org.springframework.core.env.AbstractPropertyResolver.doResolvePlaceholders(AbstractPropertyResolve.java:204)位于org.springframework.core.env.AbstractPropertyResolver.resolveRequiredPlaceholders(AbstractPropertyResolve.java:178)位于org.springframework.context.support.PropertySourcesPlaceholderConfigurer$2.resolveStringValue(PropertySourcesPlaceholdeConfigurer.java:175)网址:org.springframework.beans.factory.support.AbstractBeanFactory.resolveEmbeddedValue(AbstractBeanFactory.java:800)位于org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:917)位于org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:904)网址:org.springframework.beans.factory.support.ConstrutorResolver.resolveAutowiredArgument(ConstructorResolver.java:815)位于org.springframework.beans.factory.support.ConstrutorResolver.createArgumentArray(ConstructorResolver.java:743)位于org.springframework.beans.factory.support.ConstrutorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:466)网址:org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory:java:1113)网址:org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1008)网址:org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:505)网址:org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:476)位于org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:302)位于org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:229)网址:org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory:java:298)网址:org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:193)位于org.springframework.beans.factory.support.DefaultListableBeanFactory.findAutowireCandidate(DefaultListableBeanFactory.java:1088)位于org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1006)位于org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:904)位于org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAannotationBeanpostProcessor.java:527)…还有24个

当前不支持它。

我们有一个开放的JIRA来添加对动态服务器选择的支持,但这不太可能在即将发布的4.2版本中及时完成。

您可以通过编写自己的自定义委派会话工厂来解决这个问题,该工厂使用一些标准(例如ThreadLocal)来确定要使用哪个委派工厂。

编辑:

和XML一样,您需要一个PropertySourcesPlaceholderConfigurerbean。

您还应该使用工厂方法注入,因为@Configuration类创建得太早,无法注入@Value。。。

@Configuration
public class FooConfig {
@Bean
public DefaultSftpSessionFactory factory(
@Value("${inbound.host}") String host, 
@Value("${inbound.port}") int port) {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(host);
sf.setPort(port);
return sf;
}
@Bean
public PropertySourcesPlaceholderConfigurer configurer() {
return new PropertySourcesPlaceholderConfigurer();
}
}

public class Testing {
@Test
public void test() {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(FooConfig.class);
StandardEnvironment env = new StandardEnvironment();
Properties props = new Properties();
props.setProperty("inbound.host", "bar");
props.setProperty("inbound.port", "23");
PropertiesPropertySource pps = new PropertiesPropertySource("sftpprop", props);
env.getPropertySources().addLast(pps);
context.setEnvironment(env);
context.refresh();
DefaultSftpSessionFactory sessionFactory = context.getBean(DefaultSftpSessionFactory.class);
assertEquals("bar", TestUtils.getPropertyValue(sessionFactory, "host"));
context.close();
}
}

顺便说一句,委派会话工厂毕竟将在4.2中。

EDIT2

您可以避免配置类的早期实例化,并使用全局@Value注入,只要您使PSPCbeanstatic。。。

@Configuration
public class FooConfig {
@Value("${foo}")
public String foo;
@Bean
public String earlyFoo() {
return this.foo;
}
@Bean
public String foo(@Value("${foo}") String foo) {
return foo;
}
@Bean
public static PropertySourcesPlaceholderConfigurer configurer() {
return new PropertySourcesPlaceholderConfigurer();
}
}

在这种情况下,earlyFoo按预期填充。

最新更新