弹簧集成事务同步工厂没有轮询器



根据下面的代码,似乎transactionSynchronizationFactory只可用于轮询器。尽管如此,轮询者还是引入了一个新线程。那么问题来了:如何停留在第一个事务中,却使用事务同步工厂?流程应该是这样的(使用直接通道而不是队列(:

网关 -> JPA 更新网关 ->直接通道(保持在同一线程中(->文件出站适配器

@MessagingGateway
public interface School {
@Gateway(requestChannel = "school.input")
void add(StudentDomain student);
}
@Bean
public IntegrationFlow school() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory)
.entityClass(StudentDomain.class)
.persistMode(PersistMode.PERSIST), e -> e.transactional(true))
.handle("conditionalService", "databaseTransaction")
.channel(fileWritingChannel());
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
ConditionalService conditionalService(){
return new ConditionalService();
}
@Bean
public QueueChannel fileWritingChannel() {
return new QueueChannel();              // Queue for the poller (transactionSynchronizationFactory is only for pallable resources ?)
}
@Bean
public IntegrationFlow fileWritingFlow() {
return IntegrationFlows.from(fileWritingChannel())
.handle("conditionalService", "fileTransaction", e -> e.poller(Pollers
.fixedDelay(1000)
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.transactional(pseudoTransactionManager())))
.transform(Transformers.toJson())
.enrichHeaders(h -> h.header("file_name", "student.json"))
.handle(Files.outboundAdapter(new File( "." + File.separator + "output2")).autoCreateDirectory(true))
.get();
}
@Bean
PseudoTransactionManager pseudoTransactionManager(){
PseudoTransactionManager pseudoTransactionManager = new PseudoTransactionManager();
return pseudoTransactionManager;
}
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
TransacSynchro syncProcessor = new TransacSynchro();
return new DefaultTransactionSynchronizationFactory(syncProcessor);
}
class TransacSynchro implements TransactionSynchronizationProcessor {
@Override
public void processBeforeCommit(IntegrationResourceHolder holder) {
//System.out.println(holder.getMessage());
}
@Override
public void processAfterCommit(IntegrationResourceHolder holder) {
Message message = holder.getMessage();
if(message != null){
System.out.println("processAfterCommit: " + holder.getMessage());
}
}
@Override
public void processAfterRollback(IntegrationResourceHolder holder) {
Message message = holder.getMessage();
if(message != null){
System.out.println("processAfterRollback: " + holder.getMessage());
}
}
}

否则,事务同步工厂可以与 ImapIdleChannelAdapter 一起使用:

@Bean
public ImapIdleChannelAdapter imapIdleChannelAdapter(ImapMailReceiver imapMailReceiver) {
ImapIdleChannelAdapter imapIdleChannelAdapter = new ImapIdleChannelAdapter(imapMailReceiver);
imapIdleChannelAdapter.setAutoStartup(true);
imapIdleChannelAdapter.setOutputChannel(receiveChannel());
imapIdleChannelAdapter.setTransactionSynchronizationFactory(transactionSynchronizationFactory());
return imapIdleChannelAdapter;
}

但是使用输出文件适配器呢? 谢谢。

TransactionSynchronizationFactory

实际上是为那些不是从最终用户代码启动的组件引入的。轮询器和ImapIdleChannelAdapter是那些在自己的线程中完成工作的组件的示例,您无法从它们访问 TX 完成。

常规.handle(Jpa.updatingGateway(this.entityManagerFactory)更像是最终用户事件驱动的组件,因此您可以try...catch其事务完成以执行某些特定逻辑。

ExpressionEvaluatingRequestHandlerAdvice可能会与上述e.transactional(true)一起帮助您。因此,您将调用包装为一些建议,该建议将对正常完成和下游抛出的异常做出反应。

在文档中查看详细信息:https://docs.spring.io/spring-integration/reference/html/messaging-endpoints.html#message-handler-advice-chain

感谢Gary和Artem的快速回复。 因此,在主程序上,我可以@Transactional使用:

@Transactional
public void manyStudentsInTheSameTransaction(){
StudentDomain morgane = new StudentDomain("C", 20);     
school.add(morgane); // no exception => added to the transaction
StudentDomain loic = new StudentDomain("D", 50);            
school.add(loic); // exception in the same transaction => C and D are rollback in the database
}

并尝试...抓住:

try{
conditionalService.manyStudentsInTheSameTransaction();
} catch(Exception e){
System.out.println("Catched ecxception: " + e);
}

它在数据库中工作:两个学生都被回滚,因为第二个学生发生异常。好!

工作流如下所示:

网关 -> JPA 更新网关 -> 直接通道 -> 文件出站适配器

但是我现在要做的是像事务对数据库一样管理文件。好的,您建议处理 catch 子句中的文件。尽管如此,我不想在 catch 子句中处理它,因为由于数据库事务在幕后扮演它们的角色,我希望对文件做同样的事情。

然后我尝试了你的解决方案Artems与表达式评估请求处理程序建议:

@Value("${orcha.rollback-transaction-directory}")
String rollbackTransactionDirectory;
@MessagingGateway
public interface School {
@Gateway(requestChannel = "school.input")
@Transactional
void add(StudentDomain student);
}
@Bean
public IntegrationFlow school() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory)
.entityClass(StudentDomain.class)
.persistMode(PersistMode.PERSIST), e -> e.transactional(true))
.handle("conditionalService", "databaseTransaction")
.channel(fileWritingChannel());
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
ConditionalService conditionalService(){
return new ConditionalService();
}
@Bean
public DirectChannel fileWritingChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow fileWritingFlow() {
return IntegrationFlows.from(fileWritingChannel())
.handle("conditionalService", "fileTransaction", c -> c.transactional(true).advice(expressionAdvice()))
.enrichHeaders(s -> s.headerExpressions(h -> h.put("file_name", "payload.getFirstName()")))
.transform(Transformers.toJson())
.handle(Files.outboundAdapter(new File( "." + File.separator + "output2")).autoCreateDirectory(true))
.get();
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
//advice.setSuccessChannelName("success.input");
//advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString("payload");
advice.setTrapException(false);
return advice;
}
/*@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}*/
@Bean
public IntegrationFlow failure() {
return f -> f.<ExpressionEvaluatingRequestHandlerAdvice.MessageHandlingExpressionEvaluatingAdviceException, StudentDomain>transform(m -> (StudentDomain) m.getEvaluationResult())
.enrichHeaders(s -> s.headerExpressions(h -> h.put("file_name", "payload.getFirstName()")))
.transform(Transformers.toJson())
.handle(Files.outboundAdapter(new File( "." + File.separator + rollbackTransactionDirectory))
.autoCreateDirectory(true));
}

现在,工作流如下所示:

网关 -> JPA 更新网关 -> 直接通道(保持在同一线程中(->文件出站适配器 <=> 表达式评估请求处理程序建议

正如预期的那样,主程序的两个学生在数据库中回滚(感谢@Transactional(,但只有一个学生最终被视为文件出站适配器的回滚

@Transactional
public void manyStudentsInTheSameTransaction(){
StudentDomain morgane = new StudentDomain("C", 20);     
// no exception => added to the database transaction and (unfortunatly) considered as ok for the outbound file adapter
school.add(morgane);
StudentDomain loic = new StudentDomain("D", 50);    
// exception in the same transaction => C and D are rollback in the database (good), D is considered as non ok for the outbound file adapter (good), but C not (bad)
school.add(loic);
}

所以,总结并回到原始帖子的主题,我虽然 transactionSynchronizationFactory 是处理数据库事务等文件事务的解决方案,但正如您所说 Artem:

TransactionSynchronizationFactory 实际上是为那些不是从最终用户代码启动的组件引入的

但问题是一样的:如何将 transactionSynchronizationFactory 或等效的解决方案用于从最终用户代码启动的组件。

对不起,这个长问题。谢谢。

最新更新