RabbitMQ 关闭时停止轮询文件:春季集成



我正在做一个项目,我们正在从 sftp 服务器轮询文件并将其流式传输到 rabbitmq 队列上的对象中。现在,当 rabbitmq 关闭时,它仍然会轮询并从服务器中删除文件,并在 rabbitmq 关闭时将其发送到队列时丢失文件。我正在使用表达式评估请求处理程序建议在成功转换时删除文件。我的代码如下所示:

@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(sftpProperties.getSftpHost());
factory.setPort(sftpProperties.getSftpPort());
factory.setUser(sftpProperties.getSftpPathUser());
factory.setPassword(sftpProperties.getSftpPathPassword());
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}    
@Bean
public SftpRemoteFileTemplate sftpRemoteFileTemplate() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}
@Bean
@InboundChannelAdapter(channel = TransformerChannel.TRANSFORMER_OUTPUT, autoStartup = "false",
poller = @Poller(value = "customPoller"))
public MessageSource<InputStream> sftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(sftpRemoteFileTemplate,
null);
messageSource.setRemoteDirectory(sftpProperties.getSftpDirPath());
messageSource.setFilter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
"streaming"));
messageSource.setFilter(new SftpSimplePatternFileListFilter("*.txt"));
return messageSource;
}    

@Bean
@Transformer(inputChannel = TransformerChannel.TRANSFORMER_OUTPUT,
outputChannel = SFTPOutputChannel.SFTP_OUTPUT,
adviceChain = "deleteAdvice")
public org.springframework.integration.transformer.Transformer transformer() {
return new SFTPTransformerService("UTF-8");
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice deleteAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpressionString(
"@sftpRemoteFileTemplate.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(false);
return advice;
}

我不希望在 rabbitmq 服务器关闭时从远程 sftp 服务器中删除/轮询文件。我怎样才能做到这一点?

更新

很抱歉没有提到我正在使用春云流兔子粘合剂。这是变压器服务:

public class SFTPTransformerService extends StreamTransformer {
public SFTPTransformerService(String charset) {
super(charset);
}
@Override
protected Object doTransform(Message<?> message) throws Exception {
String fileName = message.getHeaders().get("file_remoteFile", String.class);
Object fileContents = super.doTransform(message);
return new customFileDTO(fileName, (String) fileContents);
}
}

UPDATE-2我按照建议在自定义轮询器上添加了TransactionSynchronizationFactory。现在它不会在兔子服务器关闭时轮询文件,但是当服务器启动时,它会一遍又一遍地轮询同一个文件!!我想不通为什么?我想我不能在 4.3.2 版本上使用PollerSpec导致 im。

@Bean(name = "customPoller")
public PollerMetadata pollerMetadataDTX(StartStopTrigger startStopTrigger,
CustomTriggerAdvice customTriggerAdvice) {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setAdviceChain(Collections.singletonList(customTriggerAdvice));
pollerMetadata.setTrigger(startStopTrigger);
pollerMetadata.setMaxMessagesPerPoll(Long.valueOf(sftpProperties.getMaxMessagePoll()));
ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
new ExpressionEvaluatingTransactionSynchronizationProcessor();
syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
syncProcessor.setBeforeCommitChannel(
applicationContext.getBean(TransformerChannel.TRANSFORMER_OUTPUT, MessageChannel.class));
syncProcessor
.setAfterCommitChannel(
applicationContext.getBean(SFTPOutputChannel.SFTP_OUTPUT, MessageChannel.class));
syncProcessor.setAfterCommitExpression(new SpelExpressionParser().parseExpression(
"@sftpRemoteFileTemplate.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])"));
DefaultTransactionSynchronizationFactory defaultTransactionSynchronizationFactory =
new DefaultTransactionSynchronizationFactory(syncProcessor);
pollerMetadata.setTransactionSynchronizationFactory(defaultTransactionSynchronizationFactory);
return pollerMetadata;
}

我不知道您是否需要此信息,但我的CustomTriggerAdviceStartStopTrigger如下所示:

@Component
public class CustomTriggerAdvice extends AbstractMessageSourceAdvice {
@Autowired private StartStopTrigger startStopTrigger;
@Override
public boolean beforeReceive(MessageSource<?> source) {
return true;
}
@Override
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
if (result == null) {
if (startStopTrigger.getStart()) {
startStopTrigger.stop();
}
} else {
if (!startStopTrigger.getStart()) {
startStopTrigger.stop();
}
}
return result;
}
}

public class StartStopTrigger implements Trigger {
private PeriodicTrigger startTrigger;
private boolean start;
public StartStopTrigger(PeriodicTrigger startTrigger, boolean start) {
this.startTrigger = startTrigger;
this.start = start;
}
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
if (!start) {
return null;
}
start = true;
return startTrigger.nextExecutionTime(triggerContext);
}
public void stop() {
start = false;
}
public void start() {
start = true;
}
public boolean getStart() {
return this.start;
}
}

好吧,很高兴看到您的SFTPTransformerService并确定在向下代理的情况下应该有异常时如何执行onSuccessExpression

您不仅应该抛出异常不执行删除,还应该考虑添加一个RequestHandlerRetryAdvice将文件重新发送到 RabbitMQ: https://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/messaging-endpoints-chapter.html#retry-advice

更新

所以,好吧,既然Gary猜测你在内部处理后使用Spring Cloud Stream向Rabbit Binder发送消息(非常遗憾你最初没有分享这些信息(,你需要看看Binder错误处理这个问题:https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_retry_with_the_rabbitmq_binder

的确,ExpressionEvaluatingRequestHandlerAdvice仅适用于SFTPTransformerService,仅此而已。下游错误(在活页夹中(尚未包含在此过程中。

更新 2

是的。。。我认为 Gary 是对的,除非在customPoller级别而不是该ExpressionEvaluatingRequestHandlerAdvice配置TransactionSynchronizationFactory,否则我们别无选择:表达式评估请求处理程序建议。

DefaultTransactionSynchronizationFactory可以使用ExpressionEvaluatingTransactionSynchronizationProcessor进行配置,其目标与上述ExpressionEvaluatingRequestHandlerAdvice相似,但在事务级别,这将包括从SFTP通道适配器开始并在Rabbit Binder级别结束的发送到AMQP尝试的过程。

有关详细信息,请参阅参考手册:https://docs.spring.io/spring-integration/reference/html/transactions.html#transaction-synchronization。

具有ExpressionEvaluatingRequestHandlerAdvice(和任何AbstractRequestHandlerAdvice(的点,即它们仅在方法周围handleRequestMessage()边界,因此仅在组件期间声明它们。

最新更新