如何确保一个代码块在Python中一次只能由一个请求执行?



我可能遇到了一个经典的竞态条件,需要如何解决这个问题的建议。

VpnProfileTable = sqlalchemy.Table(
"vpn_profile",
metadata,
Column("id", Integer, primary_key=True),
Column("profile", Text(), nullable=False),
Column("server_id", ForeignKey("server.id"), nullable=False, index=True),
Column("device_id", ForeignKey("device.id"), nullable=True, index=True),
)

我有一个VPN配置表,可以一次分配给一个设备。一个配置文件永远不应该分配给多个设备。

  1. 通过这里的查询,我确保只返回没有设备id的未使用配置文件。
  2. 然后我取出所有的配置文件并随机选择一个。然后我更新数据库表,以表明所选的配置文件现在分配给一个设备,不应该给其他人。

然而,我相信在获取和更新记录之间发生了竞争条件,并且我最终有时会有两个用户获得相同的配置文件。

async def get_next_vpn_profile(
self, server_id: str, device_id: str
) -> Optional[str]:
query = (
VpnProfileTable.select()
.where(VpnProfileTable.c.server_id == server_id)
.where(VpnProfileTable.c.device_id == None)
)
async with engine.begin() as conn:
records = (await conn.execute(query)).fetchall()
profiles = []
if records and len(records) > 0:
profiles = [VpnProfile.parse_obj(i) for i in records]
if profiles:
profile: VpnProfile = random.choice(profiles)
query = (
VpnProfileTable.update()
.where(VpnProfileTable.c.id == profile.id)
.values(device_id=device_id)
)
await conn.execute(query)
return profile.profile
else:
return None

我该怎么做才能使这段代码在所有传入请求中只能服务于一个请求来避免这种情况?(应用程序运行在Gunicorn/Uvicorn)?还是有更好的办法?我在考虑单例/信号量,但是我不明白。

正如Marat在评论中提到的,我认为最好的方法是在数据库级别使用锁。我正在使用Postgres,所以我不确定我是否需要nowait=True里面的with_for_update()

async with engine.begin() as conn:
query = (
VpnProfileTable.select()
.where(
VpnProfileTable.c.device_id == None,
)
.with_for_update()
)
record = (await conn.execute(query)).first()
if record:
query = (
VpnProfileTable.update()
.where(VpnProfileTable.c.id == record.id)
.values(device_id=device_id)
)
await conn.execute(query)
await conn.commit()

总而言之,我相信这会获得第一个可用的vpn配置文件,没有任何device_id,锁定它,所以希望其他进程在这里等待,直到行可以再次读取。

然后在同一个事务中,我将获得的vpn配置文件设置为给定的device_id并提交更改。(不确定我是否需要提交,如果已经有一个with engine.begin()语句。它应该自动发生。

我想不出为这种情况编写单元测试的方法,所以我希望有人能验证这一点。.with_for_update()是否足以使其他进程在尝试运行相同的选择语句时等待?

因为如果他们等待,他们不会得到相同的行,因为它已经被分配给另一个device_id,这正是我需要的。

将代码封装在互斥锁中。在Python中,可以使用multiprocessing.Lock;例如:

from multiprocessing import Lock
mutex = Lock()
async def get_next_vpn_profile(
self, server_id: str, device_id: str
) -> Optional[str]:
...
with mutex:
async with engine.begin() as conn:
...

如果使用多个进程,互斥锁通常不起作用,但是这个答案澄清了Gunicorn工作人员可以在使用multiprocessing中的锁时共享一个锁。

最新更新