目前,我有大约 ~5000 个作业,我有一个循环调度和 im 调度:
for i, job in enumerate(Jobs):
res = process_job.apply_async(args=[job], queue='job_queue')
完成循环大约需要 18 秒
我尝试将它们全部作为 group() 调用发送,但似乎也很慢。
关于如何快速调度多个作业的任何建议?
此外,我试图通过多处理并行调度,但线程/进程的开销似乎也抵消了好处
这取决于如何
检索Jobs
,但我们使用调度程序任务来处理这个问题,然后我们可以调用调度程序任务。
@task
def process_job(job):
# do stuff for this job
@task
def dispatcher():
for job in Jobs:
process_job.apply_async(args=[job], queue='job_queue')
也许你应该尝试使用独立线程来调度任务的Celery-dispatcher。
您可以在主任务中生成子任务,并在另一个函数中处理每个结果:
def handle_result(root_id, task_id, retval, **kwargs):
print(retval)
@shared_task
def sqrt(i):
return i * i
@dispatch(receiver=handle_result)
@shared_task
def calc():
for i in range(10):
yield sqrt, (i,)