我已经实现了以下场景:
- 一个队列通道,以字节的形式保存消息[]
- 消息处理程序,轮询队列通道并通过 sftp 上传文件
- 一个转换器,侦听错误通道并将从失败消息中提取的有效负载发送回 queueChannel(被认为是处理失败消息的错误处理程序,因此不会丢失任何内容(
如果 sftp 服务器处于联机状态,则一切按预期工作。
如果 sftp 服务器关闭,则作为转换器到达的错误消息是:
org.springframework.messaging.MessagingException: Failed to obtain pooled item; nested exception is java.lang.IllegalStateException: failed to create SFTP Session
转换器对此无能为力,因为有效负载的 failedMessage 为 null 并本身引发异常。转换器会丢失消息。
如何配置我的流以使转换器获得正确的消息,其中包含未成功上传的文件的相应有效负载?
我的配置:
@Bean
public MessageChannel toSftpChannel() {
final QueueChannel channel = new QueueChannel();
channel.setLoggingEnabled(true);
return new QueueChannel();
}
@Bean
public MessageChannel toSplitter() {
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageHandler handler() {
final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof byte[]) {
return (String) message.getHeaders().get("name");
} else {
throw new IllegalArgumentException("byte[] expected in Payload!");
}
});
return handler;
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
final Properties jschProps = new Properties();
jschProps.put("StrictHostKeyChecking", "no");
jschProps.put("PreferredAuthentications", "publickey,password");
factory.setSessionConfig(jschProps);
factory.setHost(sftpHost);
factory.setPort(sftpPort);
factory.setUser(sftpUser);
if (sftpPrivateKey != null) {
factory.setPrivateKey(sftpPrivateKey);
factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
} else {
factory.setPassword(sftpPasword);
}
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
@Bean
@Splitter(inputChannel = "toSplitter")
public DmsDocumentMessageSplitter splitter() {
final DmsDocumentMessageSplitter splitter = new DmsDocumentMessageSplitter();
splitter.setOutputChannelName("toSftpChannel");
return splitter;
}
@Transformer(inputChannel = "errorChannel", outputChannel = "toSftpChannel")
public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {
Message<?> failedMessage = ((MessagingException) errorMessage.getPayload())
.getFailedMessage();
return MessageBuilder.withPayload(failedMessage)
.copyHeadersIfAbsent(failedMessage.getHeaders())
.build();
}
@MessagingGateway
public interface UploadGateway {
@Gateway(requestChannel = "toSplitter")
void upload(@Payload List<byte[]> payload, @Header("header") DmsDocumentUploadRequestHeader header);
}
谢谢。。
更新
@Bean(PollerMetadata.DEFAULT_POLLER)
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)
PollerMetadata poller() {
return Pollers
.fixedRate(5000)
.maxMessagesPerPoll(1)
.receiveTimeout(500)
.taskExecutor(taskExecutor())
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.get();
}
@Bean
@ServiceActivator(inputChannel = "toMessageStore", poller = @Poller(PollerMetadata.DEFAULT_POLLER))
public BridgeHandler bridge() {
BridgeHandler bridgeHandler = new BridgeHandler();
bridgeHandler.setOutputChannelName("toSftpChannel");
return bridgeHandler;
}
null
failedMessage
是一个错误;转载了INT-4421。
我不建议在这种情况下使用QueueChannel
。如果您使用直接渠道,则可以配置重试建议以尝试重新传递。当重试次数用尽(如果已配置(时,异常将引发回调用线程。
将建议添加到SftpMessageHandler
的adviceChain
属性中。
编辑
您可以通过在可轮询通道和 sftp 适配器之间插入网桥来解决"丢失"失败的消息:
@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
public BridgeHandler bridge() {
BridgeHandler bridgeHandler = new BridgeHandler();
bridgeHandler.setOutputChannelName("toRealSftpChannel");
return bridgeHandler;
}
@Bean
@ServiceActivator(inputChannel = "toRealSftpChannel")
public MessageHandler handler() {
final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression("foo"));
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof byte[]) {
return (String) message.getHeaders().get("name");
}
else {
throw new IllegalArgumentException("byte[] expected in Payload!");
}
});
return handler;
}