我对芹菜有新手水平的经验。我已经编写了许多任务,包括计划任务和一次性延迟作业,但仅此而已。
我遇到了一个问题,我想创建一个任务来启动 1000 个较小的作业,以减轻队列长度和可能需要数小时才能完成的工作可能出现的任何问题。
当前应用程序依赖于来自外部 API 的信息。可以这么说,用户将他们的帐户与我集成的另一个服务相关联,我想每天更新用户的信息,并更改他们的外部帐户。
我有这样的预定工作
@app.task()
def refresh_accounts():
for account in Account.objects.all():
response = retrieve_account_info(account_id=account.id)
account.data = response.data
account.save()
--
我想要的是这样的东西
@app.task()
def kickoff_refresh():
for account in Account.objects.all()
refresh_account.delay(account_id=account.id)
@app.task()
def refresh_account(account_id=None):
account = Account.objects.get(id=account_id)
response = retrieve_account_info(account_id=account.id)
account.data = response.data
account.save()
我想到的一种方法是将kickoff_refresh
和refresh_account
放在不同的队列中。@app.task(queue=q1)
,@app.task(queue=q2)
...但是,我不知道是否有更好的方法可以做到这一点。在芹菜中,在同一队列上调用任务内的任务似乎是不好的做法 - https://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks 任务kickoff_refresh
将是每隔几个小时运行的定期任务。
我很想听听什么对别人有用。谢谢
from celery import group
@app.task()
def kickoff_refresh(account_id=None):
job = group(refresh_account.s(account_id=account.id) for account in Account.objects.all())()
@app.task()
def refresh_account(account_id=None):
account = Account.objects.get(id=account_id)
response = retrieve_account_info(account_id=account.id)
account.data = response.data
account.save()