如何为Java DSL集成流添加事务支持



我必须为集成流添加事务支持。假设有3个变压器。第一个和第三个转换器应该在同一事务中完成,但第二个转换器不应该在任何事务中完成。因此,如果在第三变换器中发生错误,则不应当提交来自第一和第三变换的所有改变,而应当提交来自第二变换器的改变。我该怎么做?我尝试添加.transform(FirstMessageTransformer, e -> e.transactional(true)),但随后所有转换器都在一个事务中完成。我还尝试添加.transform(FirstMessageTransformer, e -> e.transactional(false)),但似乎效果不佳,因为即使发生异常,更改也会提交给所有的转换器。

@Bean
public IntegrationFlow myMessageFromMessageAmqpInboundFlow() {
return IntegrationFlows.from(myInboundChannel)
.transform(FirstMessageTransformer)
.transform(SecondMessageTransformer)
.transform(ThirdMessageTransformer)
.channel(anOutputChannel)
.get();
}

这样尝试:

.transform(FirstMessageTransformer, e -> e.transactional(true))
.transform(SecondMessageTransformer, 
e -> e.transactional(
new TransactionInterceptorBuilder()
.transactionManager(txManager)
.propagation(Propagation.NOT_SUPPORTED)
.build()))
.transform(ThirdMessageTransformer)

这样,您将拥有一个以FirstMessageTransformer开始的整个子流的事务,而Propagation.NOT_SUPPORTED将要求SecondMessageTransformer挂起当前事务,并在事务外仅执行此MessageHandler。在完成SecondMessageTransformer的工作后,原始事务应该恢复并继续流的其余部分。

最新更新