在一个FTP服务器上富集Header,并在另一台FTP服务器上获取Header



我已经能够成功地将文件从一个FTP服务器(source)发送到另一个FTP Server(target)。我首先使用入站适配器将文件从源目录发送到本地目录,然后使用出站适配器将来自本地目录的文件发送到目标目录。到目前为止,这一切都很顺利。

我想实现的是:用哈希代码(使用传输的source上的文件生成)丰富source处的消息标头,然后在target处获取该标头,并将其与

散列代码以下是我迄今为止所尝试的:

Application.java

@SpringBootApplication
public class Application {
@Autowired
private Hashing hashing;
public static ConfigurableApplicationContext context;
public static void main(String[] args) {
context = new SpringApplicationBuilder(Application.class)
.web(false)
.run(args);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler sourceHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("Reply channel isssss:"+message.getHeaders().getReplyChannel());
Object payload = message.getPayload();
System.out.println("Payload: " + payload);
File file = (File) payload;
// enrich header with hash code before sending to target FTP
Message<?> messageOut = MessageBuilder
.withPayload(message.getPayload())
.copyHeadersIfAbsent(message.getHeaders())
.setHeaderIfAbsent("hashCode", hashing.getHashCode(file)).build();
// send to target FTP
System.out.println("Trying to send " + file.getName() + " to target");
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(messageOut);
}
};
}
}

FileTransferServiceConfig.java

@Configuration
@Component
public class FileTransferServiceConfig {
@Autowired
private ConfigurationService configurationService;
@Autowired
private Hashing hashing;
public static final String FILE_POLLING_DURATION = "5000";
@Bean
public SessionFactory<FTPFile> sourceFtpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost(configurationService.getSourceHostName());
sf.setPort(Integer.parseInt(configurationService.getSourcePort()));
sf.setUsername(configurationService.getSourceUsername());
sf.setPassword(configurationService.getSourcePassword());
return new CachingSessionFactory<>(sf);
}
@Bean
public SessionFactory<FTPFile> targetFtpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost(configurationService.getTargetHostName());
sf.setPort(Integer.parseInt(configurationService.getTargetPort()));
sf.setUsername(configurationService.getTargetUsername());
sf.setPassword(configurationService.getTargetPassword());
return new CachingSessionFactory<>(sf);
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(Message message);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(sourceFtpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(configurationService.getSourceDirectory());
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter(
configurationService.getFileMask()));
return fileSynchronizer;
}
@Bean
public AcceptOnceFileListFilter<File> acceptOnceFileListFilter() {
return new AcceptOnceFileListFilter<>();
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel",
poller = @Poller(fixedDelay = FILE_POLLING_DURATION))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source
= new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File(configurationService.getLocalDirectory()));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(acceptOnceFileListFilter());
return source;
}
// makes sure transfer continues on connection reset
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setTrapException(true);
advice.setOnFailureExpression("@acceptOnceFileListFilter.remove(payload)");
return advice;
}
@Bean
@ServiceActivator(inputChannel = "toFtpChannel")
public void listenOutboundMessage() {
// tried to subscribe to "toFtpChannel" but this was not triggered
System.out.println("Message received");
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel", adviceChain = "expressionAdvice")
public MessageHandler targetHandler() {
FtpMessageHandler handler = new FtpMessageHandler(targetFtpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression(
configurationService.getTargetDirectory()));
return handler;
}
}

哈希.java

public interface Hashing {
public String getHashCode(File payload);
}

我已经设法在sourceHandler()中丰富了消息,构建了消息并将其发送到target,但我不知道如何在target上接收该消息,以便从消息中获取标头?

如果需要更多信息,请告诉我。我真的很感谢你的帮助。

ftpChannel上有两个订阅者-目标处理程序和您的sourceHandler;除非ftpChannel被声明为pubsub-channel,否则他们将获得备用消息。

您对toFtpChannel的订阅应该没有问题。

启用DEBUG日志记录,以在应用程序上下文启动时查看所有订阅活动。

编辑

@ServiceActivator中移除@Bean——这样的bean必须是MessageHandler

@ServiceActivator(inputChannel = "toFtpChannel")
public void listenOutboundMessage(Message message) {
// tried to subscribe to "toFtpChannel" but this was not triggered
System.out.println("Message received:" + message);
}

对我来说很好…

Payload: /tmp/foo/baz.txt
Trying to send baz.txt to target
Message received:GenericMessage [payload=/tmp/foo/baz.txt, headers={hashCode=foo, id=410eb9a2-fe8b-ea8a-015a-d5896387cf00, timestamp=1509115006278}]

再次;ftpChannel上必须只有一个订阅者,除非您将其设为pubsub。

相关内容

  • 没有找到相关文章

最新更新