弹簧集成 - 处理过时的 sftp 会话



我已经实现了以下场景:

  1. 一个队列通道,以字节的形式保存消息[]
  2. 消息处理程序,轮询队列通道并通过 sftp 上传文件
  3. 一个转换器,侦听错误通道并将从失败消息中提取的有效负载发送回 queueChannel(被认为是处理失败消息的错误处理程序,因此不会丢失任何内容(

如果 sftp 服务器处于联机状态,则一切按预期工作。

如果 sftp 服务器关闭,则作为转换器到达的错误消息是:

org.springframework.messaging.MessagingException: Failed to obtain pooled item; nested exception is java.lang.IllegalStateException: failed to create SFTP Session

转换器对此无能为力,因为有效负载的 failedMessage 为 null 并本身引发异常。转换器会丢失消息。

如何配置我的流以使转换器获得正确的消息,其中包含未成功上传的文件的相应有效负载?

我的配置:

  @Bean
  public MessageChannel toSftpChannel() {
    final QueueChannel channel = new QueueChannel();
    channel.setLoggingEnabled(true);
    return new QueueChannel();
  }
  @Bean
  public MessageChannel toSplitter() {
    return new PublishSubscribeChannel();
  }
  @Bean
  @ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
  public MessageHandler handler() {
    final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
    handler.setFileNameGenerator(message -> {
      if (message.getPayload() instanceof byte[]) {
        return (String) message.getHeaders().get("name");
      } else {
        throw new IllegalArgumentException("byte[] expected in Payload!");
      }
    });
    return handler;
  }
  @Bean
  public SessionFactory<LsEntry> sftpSessionFactory() {
    final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    final Properties jschProps = new Properties();
    jschProps.put("StrictHostKeyChecking", "no");
    jschProps.put("PreferredAuthentications", "publickey,password");
    factory.setSessionConfig(jschProps);
    factory.setHost(sftpHost);
    factory.setPort(sftpPort);
    factory.setUser(sftpUser);
    if (sftpPrivateKey != null) {
      factory.setPrivateKey(sftpPrivateKey);
      factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
    } else {
      factory.setPassword(sftpPasword);
    }
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<>(factory);
  }
  @Bean
  @Splitter(inputChannel = "toSplitter")
  public DmsDocumentMessageSplitter splitter() {
    final DmsDocumentMessageSplitter splitter = new DmsDocumentMessageSplitter();
    splitter.setOutputChannelName("toSftpChannel");
    return splitter;
  }
  @Transformer(inputChannel = "errorChannel", outputChannel = "toSftpChannel")
  public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {
    Message<?> failedMessage = ((MessagingException) errorMessage.getPayload())
      .getFailedMessage();
    return MessageBuilder.withPayload(failedMessage)
                         .copyHeadersIfAbsent(failedMessage.getHeaders())
                         .build();
  }
  @MessagingGateway 
  public interface UploadGateway {
    @Gateway(requestChannel = "toSplitter")
    void upload(@Payload List<byte[]> payload, @Header("header") DmsDocumentUploadRequestHeader header);
  }

谢谢。。

更新

@Bean(PollerMetadata.DEFAULT_POLLER)
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)
  PollerMetadata poller() {
    return Pollers
      .fixedRate(5000)
      .maxMessagesPerPoll(1)
      .receiveTimeout(500)
      .taskExecutor(taskExecutor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory())
      .get();
  }
  @Bean
  @ServiceActivator(inputChannel = "toMessageStore", poller = @Poller(PollerMetadata.DEFAULT_POLLER))
  public BridgeHandler bridge() {
    BridgeHandler bridgeHandler = new BridgeHandler();
    bridgeHandler.setOutputChannelName("toSftpChannel");
    return bridgeHandler;
  }

null failedMessage是一个错误;转载了INT-4421。

我不建议在这种情况下使用QueueChannel。如果您使用直接渠道,则可以配置重试建议以尝试重新传递。当重试次数用尽(如果已配置(时,异常将引发回调用线程。

将建议添加到SftpMessageHandleradviceChain属性中。

编辑

您可以通过在可轮询通道和 sftp 适配器之间插入网桥来解决"丢失"失败的消息:

@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
public BridgeHandler bridge() {
    BridgeHandler bridgeHandler = new BridgeHandler();
    bridgeHandler.setOutputChannelName("toRealSftpChannel");
    return bridgeHandler;
}
@Bean
@ServiceActivator(inputChannel = "toRealSftpChannel")
public MessageHandler handler() {
    final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression("foo"));
    handler.setFileNameGenerator(message -> {
        if (message.getPayload() instanceof byte[]) {
            return (String) message.getHeaders().get("name");
        }
        else {
            throw new IllegalArgumentException("byte[] expected in Payload!");
        }
    });
    return handler;
}

最新更新