我正在尝试实现任务分配系统。用户可以从池中请求任务。即使设置为可序列化,事务有时也会将相同的任务提供给多个用户,即使它不应该这样做。
简化架构:
CREATE TABLE tasks(
_id CHAR(24) PRIMARY KEY,
totalInstances BIGINT NOT NULL
);
CREATE TABLE assigned(
_id CHAR(24) PRIMARY KEY,
_task CHAR(24) NOT NULL
);
任务表充满了许多行,假设每行都有totalInstances = 1
,这意味着每个任务最多应该分配一次。
查询以在assigned
中添加行:
WITH task_instances AS (
SELECT t._id, t.totalInstances - COUNT(assigned._id) openInstances
FROM tasks t
LEFT JOIN assigned ON t._id = assigned._task
GROUP BY t._id, t.totalInstances
),
selected_task AS (
SELECT _id
FROM task_instances
WHERE openInstances > 0
LIMIT 1
)
INSERT INTO assigned(_id, _task)
SELECT $1, _id
FROM selected_task;
$1
是传递给每个查询的随机 ID。
症状
我们有大约 100 个活动用户请求任务。这按预期工作,除了 1000 个请求中的一次。 然后,根据并行请求为同一_task
id 创建两个assigned
行。我希望可序列化的执行会回滚第二个,因为第一个应该将 openInstances 减少到 0。
设置
我们使用 Postgres 10.3,查询通过带有withTransactionIsolation(Serializable)
的 Slick 3.2.3 从 Scala 代码运行。没有其他查询从assigned
表中删除或插入到表中。
Postgres 日志显示请求在不同的会话中运行,并且SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE;
在每个任务分配查询之前执行。
我尝试以不同的样式重写查询,为WITH
子查询灌输VIEW
s 的使用,并用BEGIN
和COMMIT
包围查询,但没有效果。
任何帮助,不胜感激。
编辑
我应该补充一点,有时确实会出现预期的序列化错误/回滚,我们的应用程序会在这些错误/回滚上重试查询。如上所述,我在过去几个小时的日志中看到了 10 次这种正确的行为,但 2 次它仍然错误地分配了两次相同的任务。
可序列化隔离级别并不意味着事务实际上是串行的。它仅保证读取提交、可重复读取和没有幻像读取。你所描述的行为看起来并不违规。
为避免重复记录,您可以简单地执行
select ... from task_instances for update
由于此"for update"子句,所选行将在事务生存期内被锁定。因此,只有一个事务能够更新,第二个事务必须等到第一个事务提交。因此,第二个事务将读取第一个事务更新的值 - 这是您在这里需要的保证。
同样重要的是,如果在这种情况下使用"选择更新",则甚至不需要可序列化隔离级别,提交读取就足够了。
我像这样尝试过你的例子:
第一节:
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
WITH task_instances AS (
SELECT t._id, t.totalInstances - COUNT(assigned._id) openInstances
FROM tasks t
LEFT JOIN assigned ON t._id = assigned._task
GROUP BY t._id, t.totalInstances
),
selected_task AS (
SELECT _id
FROM task_instances
WHERE openInstances > 0
LIMIT 1
)
INSERT INTO assigned(_id, _task)
SELECT 1, _id
FROM selected_task;
第二节:
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
WITH task_instances AS (
SELECT t._id, t.totalInstances - COUNT(assigned._id) openInstances
FROM tasks t
LEFT JOIN assigned ON t._id = assigned._task
GROUP BY t._id, t.totalInstances
),
selected_task AS (
SELECT _id
FROM task_instances
WHERE openInstances > 0
LIMIT 1
)
INSERT INTO assigned(_id, _task)
SELECT 2, _id
FROM selected_task;
COMMIT;
第一节:
COMMIT;
这就是我得到的:
ERROR: could not serialize access due to read/write dependencies among transactions
DETAIL: Reason code: Canceled on identification as a pivot, during commit attempt.
HINT: The transaction might succeed if retried.
所以它按预期工作。
我唯一的解释是您的设置有问题,毕竟您没有使用SERIALIZABLE
。
您是否曾经在应用程序中看到序列化错误?如果没有,那将证实我的怀疑。