将事件发布给SAGA时,没有具有实际交易的EntityManager



我正在使用" Axon 4.0.3 Spring Boot 2 Spring Data(PostgreSQL)"默认配置。

已将事件发布给EventStore并等待@sagaeventhandler抓住它,我收到以下例外:

javax.persistence.transactionRequiredException:没有任何EntityManager 可用于当前线程的实际交易 - 无法可靠 过程"坚持"呼叫 org.springframework.orm.jpa.sharedentitymanagercreator $ sharseentityManagerInagerInvocationHandler.invoke(sharseentitymanagercreator.java:292) 〜[spring-orm-5.1.5.5.release.jar:5.1.5.release] at com.sun.proxy。$ proxy104.persist(未知来源)〜[na:na] java.util.stream.foreachops $ foreachop $ ofref.accept(foreachOps.java:184) 〜[NA:1.8.0_191] java.util.stream.ReferencePipeline $ 3 $ 1.Accept(参考Pipeline.java:193) 〜[NA:1.8.0_191] java.util.arraylist $ arraylistspliterator.foreachremaining(arraylist.java:1382) 〜[NA:1.8.0_191] java.util.stream.abstractpipeline.copyinto(AbstractPipeline.java:481) 〜[NA:1.8.0_191] java.util.stream.abstractpipeline.wrapandcopyinto(AbstractPipeline.java:471) 〜[NA:1.8.0_191] java.util.stream.foreachops $ foreachop.evaluateSequentequent(foreachOps.java:151) 〜[NA:1.8.0_191] java.util.stream.foreachops $ foreachop $ ofref.evaluatsequentequent(foreachOps.java:174) 〜[NA:1.8.0_191] java.util.stream.abstractpipeline.evaluate(AbstractPipeline.java:234) 〜[NA:1.8.0_191] java.util.stream.ReferencePipeline.Foreach(ReferencePipeline.java:418) 〜[NA:1.8.0_191] org.axonframework.eventsourcing.eventstore.jpa.jpaeventstorageengine.appendevents(jpaeventstorageengine.java:276) 〜[axon-eventsourcing-4.0.3.jar:4.0.3] at org.axonframework.eventsourcing.eventstore.abstarteventstorageengine.appendevents(AbstractEventStorageengine.java:98) 〜[Axon-Eventsourcing-4.0.3.Jar:4.0.3]

事件库需要哪些其他配置才能处理此情况?

ps。在该方法上添加 @transactional解决了这个问题,但我不明白为什么这是必要的。

最小代码示例(以下端点127.0.0.1:8080/1工作,但另一个端点127.0.0.1:8080/1不是):

@SpringBootApplication
class TestAxonApplication
class UserId(val userId: String = IdentifierFactory.getInstance().generateIdentifier()) : Serializable
class TestCommand(@TargetAggregateIdentifier val userId: UserId)
class TestedEvent(val userId: UserId)
fun main(args: Array<String>) {
    runApplication<TestAxonApplication>(*args)
}
@RestController
@RequestMapping
class Controller(var commandGateway: CommandGateway, var eventStore: EventStore) {
    @GetMapping("/1")
    fun done(): UserId? {
        return commandGateway.sendAndWait<UserId>(TestCommand(UserId()))
    }
    @GetMapping("/2")
    fun failure() {
        eventStore.publish(
                GenericEventMessage.asEventMessage<Void>(
                        TestedEvent(UserId())
                )
        )
    }
}
@Aggregate
class User() {
    @AggregateIdentifier
    private lateinit var userId: UserId
    @CommandHandler
    constructor(cmd: TestCommand) : this() {
        AggregateLifecycle.apply(TestedEvent(cmd.userId))
    }
    @EventHandler
    fun on(event: TestedEvent) {
        this.userId = event.userId
    }
}
@Saga
@ProcessingGroup("mySaga")
class MySaga {
    @StartSaga
    @SagaEventHandler(associationProperty = "userId")
    fun start(event: TestedEvent) {
        println("DONE ${event.userId.userId}")
    }
}

呼叫之间的区别是,一个人穿过命令总线,而另一个则跳过该命令总线并直接发布到事件总线上。默认情况下,在命令总线上配置了TransactionManager。但是,事件总线并非如此。

这意味着您在没有活动的情况下发布事件。冬眠不喜欢。

解决方案是将@Transactional放在端点上,以确保在存储事件时进行交易。

看起来您忘记设置命令总线以允许交易管理。像以下示例一样添加它,它将起作用:

@Bean
public CommandBus commandBus(TransactionManager transactionManager) {
    return new SimpleCommandBus(transactionManager, NoOpMessageMonitor.INSTANCE);
}

update

我的错,首先我用来与Axon 3.3(不是最后的轴)一起使用,我认为您正在为活动存储使用自定义配置。

Axon Spring Boot默认使用使用内存事件商店,这就是为什么如果您不定义自定义的交易将不起作用。

这是您的configurarion中缺少的内容:

@Bean
public EventStorageEngine eventStorageEngine(EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) {
    return new JpaEventStorageEngine(entityManagerProvider, transactionManager);
}
@Bean
public PlatformTransactionManager transactionManager(EntityManagerFactory emf) {
    return new JpaTransactionManager(emf);
}

´´

这是我为其他提案做的一个简单的例子,它可以很好地工作。

要牢记

另一个有趣的点是,您必须定义聚合存储库,因为Axon试图寻找与定义的聚合匹配的存储库。

@Bean
public Repository<User> documentAggregateRepository(EventStore eventStore) {
    return new EventSourcingRepository<>(User.class, eventStore);
}

对于传奇的同样,您必须注册它们,否则传奇永远不会触发

// remember to call the method sagaName + Configuration
// or you must set up the @Saga configurationBean name pointing this method
@Bean
public SagaConfiguration<MySaga> mySagaConfiguration() {
    return SagaConfiguration.subscribingSagaManager(MySaga.class);
}

在此示例中是事件采购,但是如果您对遗产迁移很有趣,也许您会在迁移过程中使用通用jparepository(如果您需要一个示例,请让我知道)。

hth。

最新更新