带_for_update的SQLAlchemy读取过时数据



我正在编写一个负责更新帐户余额的函数。为了防止并发更新,我首先使用with_for_update()锁定帐户,计算金额,更新余额,然后提交会话。为了模拟并发请求,我派生出两个进程,并在每个进程中运行一次函数。以下是用于计算和更新余额的代码:

session = create_db_session(db_engine)()
session.connection(execution_options={'isolation_level': 'SERIALIZABLE'})
print("&" * 80)
print(f"{process_number} entering!")
print("&" * 80)
accounts = (
session.query(Account)
.filter(Account.id == [some account IDs])
.with_for_update()
.populate_existing()
.all()
)
print("*" * 80)
print(f"{process_number} got here!")
for account in accounts:
print(
f"Account version: {account.version}. Name: {account.name}. Balance: {account.balance}"
)
print(hex(id(session)))
print("*" * 80)
# Calculate the total amount outstanding by account.
for account in accounts:
total_amount = _calculate_total_amount()
if account.balance >= total_amount:
# For accounts with sufficient balance, deduct the amount from the balance.
account.balance -= total_amount
else:
# Otherwise, save them for notification. Code omitted.
print("-" * 80)
print(f"{process_number} committing!")
for li, account in line_items_accounts:
print(
f"Account version: {account.version}. Name: {account.name}. Balance: {account.balance}"
)
print("-" * 80)
session.commit()

这是输出:

&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
0 entering!
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
1 entering!
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
********************************************************************************
0 got here!
Account version: 1. Name: Phi's Account. Balance: 20000.000000
0x7fcb65d7e0d0
********************************************************************************
--------------------------------------------------------------------------------
0 committing!
Account version: 1. Name: Phi's Account. Balance: 19930.010000
--------------------------------------------------------------------------------
********************************************************************************
1 got here!
Account version: 1. Name: Phi's Account. Balance: 20000.000000
0x7fcb65f930a0
********************************************************************************
--------------------------------------------------------------------------------
1 committing!
Account version: 1. Name: Phi's Account. Balance: 19930.010000
--------------------------------------------------------------------------------

0和1是进程号,十六进制数是会话的id。您可以看到锁起了作用(进程0阻止了1,直到0提交(,但1读取了过时的数据:余额应该是19930.01,而不是20000,并且在进程1的输出中;帐户版本";应该是2,而不是1。

我尝试过使用populate_existing(),但没有成功,尽管我怀疑它无论如何都不会有帮助,因为这两个会话是不同的,并且在进程0释放锁之前,进程1的会话不应该填充任何内容。我也尝试过";可重复读取";以及";可串行化";隔离级别,并且由于事务之间的并发更新/读/写依赖关系,预计在进程1中会引发异常,但什么也没发生。

值得注意的是,行为并不一致。当我在本地运行上面的代码块时,一切都能正常工作,但当我用所有代码构建一个Docker容器并在那里运行时,几乎永远不会工作。软件包版本没有差异。我正在使用Postgres和psycopg2。

我现在正把头撞在墙上,试图弄清楚发生了什么。我觉得我可能忽略了一些简单的事情。有什么想法吗?

FOR UPDATE就可以了。手册:

FOR UPDATE导致SELECT语句检索的行锁定,好像要更新一样。这防止了它们被锁定,被其他事务修改或删除,直到当前事务结束。即尝试CCD_ 8的其他事务,这些行的DELETESELECT FOR UPDATESELECT FOR NO KEY UPDATESELECT FOR SHARESELECT FOR KEY SHARE将被阻止直到当前交易结束;

加粗强调mine。

这正是SQLAlchemy的with_for_update()所做的。手册:

在不带参数的情况下调用时,生成的SELECT语句将附加一个FOR UPDATE子句。

但是,当像您一样使用SERIALIZABLE快照隔离操作时,这是多余的工作。手册:

此级别模拟所有提交事务的串行事务执行;就好像事务是一个接一个地、串行地而不是并发地执行一样。

因此,您的代码在竞争条件下是安全的使用FOR UPDATE(推荐!(、使用SERIALIZABLE事务。后者通常要贵得多。您需要为序列化失败做好准备(而不是在显示的代码中(。手册:

。。。与"可重复读取"级别一样,使用该级别的应用程序由于序列化失败,必须准备好重试事务。

房间里的大象:你真的给DB写信了吗CCD_ 20可能在之后发生故障";任务完成";已提前打印。

检查数据库日志中是否存在序列化失败或任何其他异常。如果(毫不奇怪(发现序列化失败,简单的解决方案就是切换到(默认!(READ COMMITED隔离级别。您的手动锁定已经完成了任务。

最新更新