我的 tasks.py 文件中有三个芹菜@tasks,它们通常由不同的工作人员同时排队和处理,每个工作人员的处理时间相似。 我相信我遇到的问题是,他们都在尝试在其他用户配置文件对象完成之前更新相同的用户配置文件对象。 似乎要完成的三个进程中的最后一个是成功写入数据库的进程。如果我在任务之间运行几秒钟,则一切都会完成。
知道问题是什么,或者有什么方法可以继续尝试保存到配置文件中,直到它真正起作用?
提前感谢您的帮助!
我假设你正在使用django,因为你这样标记了它。如果是这样,您可以使用select_for_update(文档)来锁定对象。这将阻止其他工作线程,直到事务完成。如果任务运行时间较长,则可能会超时,因此请捕获该异常并在必要时重试。
from django.db import transaction
from celery.task import task
@task
def mytask(mpk):
with transaction.commit_on_success():
my_obj = MyModel.objects.select_for_update().get(pk=mpk)
...
请注意,这不适用于 sqlite。
Django ORM可以在这里玩一个把戏。如果使用model_object.save()
方法,它将更新所有字段。如果你的任务是更新同一对象中的不同字段,你可以考虑使用 ModelClass.objects.filter(pk=model_id).update(some_field=some_value)
,但在这里你可能会陷入不同的 RDBMS 如何实现表/行锁定。
另一种选择是使用 Celery Chord 并在完成获取用户数据的所有任务时更新用户配置文件。您可能需要实现分布式信号量,因此唯一的和弦任务将同时为同一用户配置文件执行。
看起来这更像是一个数据库锁定问题。您是否尝试过编辑配置文件并允许数据库上的更多并发?例如在Postgre Debian上编辑你的conf文件:
nano /etc/postgresql/9.4/main/postgresql.conf
然后你可以在 conf 文件中设置这样的东西:
max_connections=100
shared_buffers = 3000MB
temp_buffers = 800MB
effective_io_concurrency = 5
max_worker_processes = 15
这应该允许您在辨别时读/写。