在postgres中删除和插入的竞争条件



我正在处理一个节点项目,在该项目中我导入了pg库用于数据库操作。我有一个Kafka队列,从中获取事件并将其存储在数据库中。我从kafka获取订单,每次更新订单时都会生成一个新事件,我需要删除旧的订单详细信息,并用新的订单替换它们。

以下是代码

async function saveOrders(orders: Array<TransactionOnOrders>) {
const client = await pool.connect()

try {
await client.query('BEGIN')
if (orders.length) {
const deleted = await deleteOrders(client, orders[0].orderId)
logger.debug(`deleted rowCount ${deleted.rowCount} ${orders[0].orderId}`)
}
const queries = orders.map(ord => saveTransactionOnOrders(client, ord))
await Promise.all(queries)
await client.query('COMMIT')
} catch (e) {
await client.query('ROLLBACK')
throw e
} finally {
client.release()
}
}

订单更新非常频繁,我们收到了许多事件,这些事件造成了比赛条件,导致记录不会被删除,并插入了额外的记录。例如:假设我们收到了order123的一个事件,并且事务正在处理中,直到它完成为止,还收到了另一个order123的事件,因此删除查询返回0个受影响的行,插入查询插入另一个导致2行的行,而应该只存在一条记录。

我试图改变隔离级别,但效果不佳,导致错误

await client.query('BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ')
await client.query('BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE')

我在这里犯了什么错误吗?或者有更好的方法来处理以上情况吗?

如果更新行而不是删除并重新创建行,这可能会更容易。在这种情况下,可以依靠行锁来防止并发更新。

使用INSERT ... ON CONFLICT"upstart"传入行。这是原子的,没有种族条件。

正如其他人所建议的那样,这里的理想选择是使用INSERT ... ON CONFLICT来原子化地执行此操作。如果看不到deleteOrderssaveTransactionOrders的内容,我就无能为力。

如果这不是一个选项,那么应该使用SERIALIZABLE作为隔离级别。然后您会得到一些串行化错误,但这些错误可以安全地重试。如果您使用@databases(https://www.atdatabases.org/docs/pg-guide-transactions)您可以通过retrySerializationFailures: true:重试

async function saveOrders(orders: Array<TransactionOnOrders>) {
await pool.tx(async client => 
if (orders.length) {
const deleted = await deleteOrders(client, orders[0].orderId)
logger.debug(`deleted rowCount ${deleted.rowCount} ${orders[0].orderId}`)
}
const queries = orders.map(ord => saveTransactionOnOrders(client, ord))
await Promise.all(queries)
}, {
isolationLevel: IsolationLevel.SERIALIZABLE,
retrySerializationFailures: true,
})
}

@数据库处理启动事务,并在异步回调结束时提交/回滚。它还会在串行化失败时重试。

如果处理的事件量非常大,则可能会因为序列化失败的频率很高而遇到性能问题,因此会重试。

您可以使用";锁定";在node.js中,以确保一次只有一个进程更新给定的订单集。https://www.atdatabases.org/docs/lock应该使其实现起来相当简单。不过,这只会锁定单个node.js进程上的并发事务,因此您仍然需要事务处理来处理多个node.js过程。

最新更新