假设您有两个不同的微服务(Customer和Account),它们都作为Spring Boot应用程序运行在Docker容器中。每次创建一个新客户时,也应该创建一个相应的帐户。为了编排这个流程,我有第三个"服务"。实现基于编排的传奇逻辑。
传奇"服务"包含以下代码:
@Saga
public class CustomerAccountSaga {
private static final String ACCOUNT_CREATION_DEADLINE = "sagas.account-creation-deadline";
private static final Logger LOGGER = LogManager.getLogger(CustomerAccountSaga.class);
@Autowired
private transient CommandGateway commandGateway;
@Autowired
private transient DeadlineManager deadlineManager;
private String customerId;
private String accountDeadlineId;
@StartSaga
@SagaEventHandler(associationProperty = "aggregateId")
private void on(CustomerCreatedEvent event) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("A new customer has been created for id = '{}'", event.getAggregateId());
}
this.customerId = event.getAggregateId().getId();
SagaLifecycle.associateWith("customerId", customerId);
//Both services has a CustomerId class defined in another package.
b.t.c.a.v.CustomerId id = new b.t.c.a.v.CustomerId(customerId);
CreateAccountCommand createAccount = new CreateAccountCommand(id);
commandGateway.send(createAccount);
this.accountDeadlineId = deadlineManager.schedule(Duration.ofDays(1), ACCOUNT_CREATION_DEADLINE);
}
@SagaEventHandler(associationProperty = "aggregateId")
private void on(CustomerDeletedEvent event) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("A customer with id '{}' has been deleted. "
+ "The customer was deleted before the account was created, "
+ "or the request to create the account timed-out",
event.getAggregateId());
}
deadlineManager.cancelSchedule(ACCOUNT_CREATION_DEADLINE, accountDeadlineId);
SagaLifecycle.end();
}
@SagaEventHandler(associationProperty = "customerId")
private void on(AccountCreatedEvent event) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("A corresponding account for customer with id '{}' has been created",
event.getCustomerId());
}
deadlineManager.cancelSchedule(ACCOUNT_CREATION_DEADLINE, accountDeadlineId);
SagaLifecycle.end();
}
@DeadlineHandler(deadlineName = ACCOUNT_CREATION_DEADLINE)
public void on() {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("Failed to create a new account for customer with id '{}' in a timely fashion",
customerId);
}
//Both services has a CustomerId class defined in another package.
b.t.c.c.v.CustomerId id = new b.t.c.c.v.CustomerId(customerId);
DeleteCustomerCommand deleteCustomer = new DeleteCustomerCommand(id);
commandGateway.send(deleteCustomer);
}
}
当所有服务都启动并运行时,一切都按预期工作。CustomerCreatedEvent由传奇处理程序处理,并按预期触发CreateAccountCommand。后者将导致创建帐户并触发AccountCreatedEvent,该事件也由传奇逻辑处理。
当我尝试以下场景时,问题出现了。在所有情况下,客户服务都在运行。
场景
- 使用客户服务创建新客户。
- 启动帐号服务。由于帐户服务没有侦听来自客户服务的任何事件,因此没有发生预期的任何事情。
- 启动saga服务我希望saga服务接收一个之前没有处理过的CustomerCreatedEvent,以协调相应帐户的创建。
情形B
- 使用客户服务创建新客户。
- 启动saga服务我希望saga服务处理CustomerCreatedEvent,但我没有收到事件。
- 启动帐号服务。我希望帐户服务接收来自saga服务的CreateAccountCommand,但由于步骤2(在此流程中)没有执行,因此没有执行。
情形C
客户服务和帐户服务都已启动并运行。服务下线。
- 创建新客户
- 启动saga服务同样,我希望saga服务能够拾取CustomerCreatedEvent并继续,但它没有。
由于在所有情况下预期的行为都没有发生,因此应用程序处于不一致状态,因为不允许存在没有相应帐户的客户。
saga服务使用以下配置为MySQL配置了一个用于Quartz和Axon的持久存储。Axon使用Jackson序列化器处理事件。
# Persistence configuration (MySQL)
###################################
# Quartz persistence
spring.quartz.job-store-type=jdbc
spring.quartz.jdbc.initialize-schema=always
spring.quartz.properties.org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
spring.quartz.properties.org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
spring.quartz.properties.org.quartz.jobStore.dataSource=dsQuartz
spring.quartz.properties.org.quartz.jobStore.tablePrefix=QRTZ_
spring.quartz.properties.org.quartz.dataSource.dsQuartz.user = ******
spring.quartz.properties.org.quartz.dataSource.dsQuartz.password = *******
spring.quartz.properties.org.quartz.dataSource.dsQuartz.maxConnections = 10
spring.quartz.properties.org.quartz.dataSource.dsQuartz.driver = com.mysql.cj.jdbc.Driver
spring.quartz.properties.org.quartz.dataSource.dsQuartz.URL = jdbc:mysql://192.168.99.100:3306/saga-store
# Axon persistence
spring.datasource.url=jdbc:mysql://192.168.99.100:3306/saga-store
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.username=******
spring.datasource.password=*****
spring.jpa.database-platform=org.hibernate.dialect.MySQL8Dialect
spring.jpa.hibernate.ddl-auto=update
问题:是我误解了微服务和传奇编排的一些基本概念,还是我在设置/设计包含业务逻辑和传奇编排逻辑的不同微服务的方式中忽略了一些东西?
感谢您阅读我的文章并指出我的错误。
在不了解您的设置的情况下,我想说这完全取决于配置。因此,基本上当您启动@Saga
时,它在引擎盖下也有一个Streaming Event Processor
,它可以启动tail
(最旧的事件)或head
(最新的事件)。
Saga流处理器的默认值是head
,正如我们的参考指南所述。如果没有配置,您的Saga将只对新的事件而不是过去的事件作出反应-您应该非常小心,并在从一个事件更改到另一个事件之前好好考虑一下。
另一个需要注意的重点是Saga Store
,如果没有配置,它将使用InMemorySagaStore
。当然,这可能不是理想的,您需要配置一个持久的。所有的部件都可以再次在我们的参考指南上找到。