芹菜:一次调度很多工作



目前,我有大约 ~5000 个作业,我有一个循环调度和 im 调度:

for i, job in enumerate(Jobs):
    res = process_job.apply_async(args=[job], queue='job_queue')

完成循环大约需要 18 秒

我尝试将它们全部作为 group() 调用发送,但似乎也很慢。

关于如何快速调度多个作业的任何建议?

此外,我试图通过多处理并行调度,但线程/进程的开销似乎也抵消了好处

这取决于如何

检索Jobs,但我们使用调度程序任务来处理这个问题,然后我们可以调用调度程序任务。

@task
def process_job(job):
    # do stuff for this job
@task
def dispatcher():
    for job in Jobs:
        process_job.apply_async(args=[job], queue='job_queue')

也许你应该尝试使用独立线程来调度任务的Celery-dispatcher。

您可以在主任务中生成子任务,并在另一个函数中处理每个结果:

def handle_result(root_id, task_id, retval, **kwargs):
    print(retval)
@shared_task
def sqrt(i):
    return i * i
@dispatch(receiver=handle_result)
@shared_task
def calc():
    for i in range(10):
        yield sqrt, (i,)

相关内容

  • 没有找到相关文章