透明地批处理存储



我们使用以下框架和版本:

  • jOOQ 3.11.1
  • Spring Boot 2.3.1.RELEASE
  • Spring 5.2.7.RELEASE

我有一个问题,我们的一些业务逻辑被划分为如下逻辑单元:

  • 收到包含用户事务的请求
  • 该请求包含各种信息,如交易类型、哪些产品是该交易的一部分、进行了何种付款等
  • 然后将这些属性分别存储在数据库中

在代码中,大致如下所示:

TransactionRecord transaction = transactionRepository.create();
transaction.create(creationCommand);`

Transaction#create(以事务方式运行)中,会出现以下情况:

storeTransaction();
storePayments();
storeProducts();
// ... other relevant information

一个给定的事务可以有许多不同类型的产品和属性,所有这些都是存储的。其中许多属性导致UPDATE语句,而有些属性可能导致INSERT语句——很难提前完全了解。

例如,storeProducts方法大致如下:

products.forEach(product -> {
ProductRecord record = productRepository.findProductByX(...);
if (record == null) {
record = productRepository.create();
record.setX(...);
record.store();
} else {
// do something else
}
});

如果产品是新的,则为INSERT。否则,可能会进行其他计算。根据事务的大小,这个单用户事务显然可能导致多达O(n)的数据库调用/往返,甚至更多,这取决于存在哪些其他属性。在存在大量属性的事务中,这可能会导致对单个请求进行数百次数据库调用(!)。我想把它降到尽可能接近O(1),以便在我们的数据库中有更可预测的负载。

这里自然会想到批量和批量插入/更新。我想做的是使用jOOQ将所有这些语句批处理到一个单独的批处理中,并在提交之前在成功调用方法之后执行。我发现有几篇文章(SO Post、jOOQ API、jOOQGitHub Feature Request)隐含地提到了这个主题,还有一篇用户组的文章似乎与我的问题明确相关。

由于我将SpringjOOQ一起使用,我相信我的理想解决方案(最好是声明性的)如下所示:

@Batched(100) // batch size as parameter, potentially
@Transactional
public void createTransaction(CreationCommand creationCommand) {
// all inserts/updates above are added to a batch and executed on successful invocation
}

为了实现这一点,我想我需要管理一个作用域(ThreadLocal/Transactional/Session作用域)资源,该资源可以跟踪当前批次,以便:

  1. 在输入方法之前,如果方法是@Batched,则会创建一个空批
  2. 通过DI提供的自定义DSLContext(可能扩展了DefaultDSLContext)具有ThreadLocal标志,该标志跟踪是否应批处理任何当前语句,如果是
  3. 截取调用并将其添加到当前批处理中,而不是立即执行它们

然而,第3步将需要重写(IMO)相对可读的大部分代码:

records.forEach(record -> {
record.setX(...);
// ...
record.store();
}

至:

userObjects.forEach(userObject -> {
dslContext.insertInto(...).values(userObject.getX(), ...).execute();
}

因为第二形式也可以使用DSLContext#batchStoreDSLContext#batchInsert重写。然而,IMO认为,批处理和批量插入不应由单个开发人员决定,应能够在更高级别上透明地处理(例如,由框架)。

我发现jOOQAPI的可读性是使用它的一个惊人的好处,但它似乎不适合(据我所知)在这样的情况下进行拦截/扩展。使用jOOQ 3.11.1(甚至是当前的)API,是否可以通过透明的批处理/批量处理获得与前者类似的行为?这意味着什么?


编辑:

为实现商店的透明批处理,脑海中浮现的一个可能但非常棘手的解决方案如下:

  1. 创建一个RecordListener,并在启用批处理时将其作为默认值添加到Configuration
  2. RecordListener#storeStart中,将查询添加到当前事务的批次中(例如在ThreadLocal<List>中)
  3. AbstractRecord具有在存储之前被检查的changed标志(org.jooq.impl.UpdatableRecordImpl#store0org.jooq.impl.TableRecordImpl#addChangedValues)。重置此项(并保存以备日后使用)会使存储操作无效
  4. 最后,在方法调用成功但在提交之前:
  • 将相应记录的changes标志重置为正确值
  • 调用org.jooq.UpdatableRecord#store,这一次不使用RecordListener,或者跳过storeStart方法(可能使用另一个ThreadLocal标志来检查是否已经执行了批处理)

据我所知,这种方法理论上应该是有效的。显然,如果代码依赖于Reflection来工作,库内部可能随时发生变化,因此它非常容易被破解。

有人知道只使用公共jOOQAPI的更好方法吗?

jOOQ 3.14解决方案

您已经发现了相关的特性请求#3419,它将从jOOQ 3.14开始在JDBC级别上解决这个问题。您可以直接使用BatchedConnection,包装您自己的连接来实现以下内容,也可以使用此API:

ctx.batched(c -> {
// Make sure all records are attached to c, not ctx, e.g. by fetching from c.dsl()
records.forEach(record -> {
record.setX(...);
// ...
record.store();
}
});

jOOQ 3.13及解决方案之前

目前,在实现#3419之前(在jOOQ 3.14中会实现),您可以自己实现它作为一种变通方法。您必须代理JDBCConnectionPreparedStatement,然后。。。

。。。全部拦截:

  • 调用Connection.prepareStatement(String),如果SQL字符串与上次准备的语句相同,则返回缓存的代理语句,或者批量执行上次准备的声明并创建新的代理语句
  • 调用PreparedStatement.executeUpdate()execute(),并将其替换为对PreparedStatement.addBatch()的调用

。。。全部委派:

  • 调用其他API,例如Connection.createStatement(),它应该刷新上面缓冲的批,然后改为调用委托API

我不建议你绕过jOOQ的RecordListener和其他SPI,我认为缓冲数据库交互的抽象级别是错误的。此外,您还需要批处理其他语句类型。

请注意,在默认情况下,jOOQ的UpdatableRecord会尝试获取生成的标识值(请参阅Settings.returnIdentityOnUpdatableRecord),这会阻止批处理。这样的store()调用必须立即执行,因为您可能希望标识值可用。

相关内容

  • 没有找到相关文章

最新更新