有效地将结果流存储在多个表中,每个项目具有乐观锁定



给定一个包含大量项目的结果流,我想存储它们并处理潜在的并发冲突:

public void onTriggerEvent(/* params */) {
Stream<Result> results = customThreadPool.submit(/*...complex parallel computation on multiple servers...*/).get();
List<Result> conflicts = store(results);
resolveConflictsInNewTransaction(conflicts);
}

被困在如何有效地实施store(...)上。Result由两个不可变且分离的对象组成,用于描述需要在各自的数据库表中更新的数据。

@Value
public static class Result {
A a; // describes update for row in table a
B b; // describes update for row in table b
}

AB分别引用两个用户,其中(u1, u2)是各自数据库表上的键。

@Value
public static class A {
long u1;
long u2;
// ... computed data fields ...
}
// B accordingly

流计算本身可能会并发触发(并行多次onTriggerEvent调用(,这通常没问题,但有时可能会导致某些结果的冲突(大约 0,1% 是冲突的,例如,一个流有一个结果(53,21)而另一个调用也同时(53,21)更新(。A和/或B的冲突由其updatedAt字段表示,与操作开始时相比会有所不同。当然,在这里,我们不想丢弃所有结果并重试,而只想解决冲突的行。

所以我想知道什么是(1(存储所有不冲突的Result.aResult.b,以及(2(获得冲突中需要特殊处理的ResultList的好方法。

public List<Result> store(Stream<Result> results) {
// store all a
// store all b (ideally without using results * 2 RAM)
// do update other stuff if a and b are not in conflict and do it in the same ACID transaction as the update of the related a and b.
// return those in Conflict
}

如何在不解压缩每个结果、将其发送到数据库自己的事务等的情况下实现它?理想情况下,我需要一次将所有内容发送到数据库并获取尚未存储的冲突列表(另一个应该已经保留(。我也对不同的方法持开放态度。

如果相关,我们使用JPA/Hibernate。

最简单的方法是将持久性简化为 FIFO 队列(存在许多技术,但一般来说,这将变成"每个事务单个条目"的方式,这不是期望的方法(。

因此,对于第二个选项,我会将并发冲突定义的逻辑从数据库持久操作移动到单独的服务。

您可以实现类似 UserId 到重入锁的内存映射(与同步块相比,这些操作非常快(。

在第一次调用持久锁期间,锁被锁定;成功持久化后,锁被释放。同时(在单独的线程中(,您可以检查锁的状态,然后按该状态过滤掉,或者等到锁被释放。小心等待状态:你有流,所以处理流的整个线程将进入等待状态。

就个人而言,我会坚持第一个"每个事务一个条目",中间有一些(持久的(消息传递队列,并有单独的服务用于锁定检查。首先,这将使我们能够轻松配置写入操作的并发性;其次,在编写器中轻松使用等待状态,因为只有一个条目将被锁定。

最新更新