我正在尝试在celener中构建以下工作流:
(chainA)
task1 -- task2 -- task3--|
|
(chainB) | (group)
task1 -- task2 -- task3--|-------------
|
(chainC) |
task1 -- task2 -- task3--|
.
.
.
我最终得到了这个代码:
list_chains = build_s_chains()
group(*list_chains)()
执行线组(*list_chains(((的那一刻,一切都停止了停顿。看起来像是死锁,没有抛出错误。
如果我尝试在for循环中执行链,一切都很好,但如果我在for循环执行它们,我将无法在for循环结束时连接另一个任务。我知道这就是和弦的定义,我也试过和弦,它仍然是阻塞的。
检查了我的rabbitmq和后端结果,一切似乎都很好,因为我可以手动运行链。在我看来,这应该很简单,但我看不出它不起作用的原因。感谢提供的任何帮助
例如,chainA看起来像这样:
job_chain = (
process_task.s(chip_measurement_object.raw_result_ref,
process_args,
process_args['file_path'],
process_args['meas_data'],
process_args['marker_data'],
process_args['session']
) |
update_marker_data.s() |
plot_task.s(chip_measurement_object.id) |
grade_task.s(chip_measurement_object.id) |
postgres_async_res_update.s(chip_measurement_object.id, self.input_args)
)
正如我提到的,job_chain.apply_async((执行得很好,但当多个链在组中时,它会挂起或阻塞。我已经看到了其他答案和文件,根据他们的说法,这应该是可行的。
这是我的芹菜设置:
# Sensible settings for celery
CELERY_ALWAYS_EAGER = False
CELERY_ACKS_LATE = True
CELERY_TASK_PUBLISH_RETRY = True
CELERY_DISABLE_RATE_LIMITS = False
# By default we will not ignore result
# If you want to see results and try out tasks_old interactively, change it to False
# Or change this setting on tasks_old level
CELERY_IGNORE_RESULT = False
CELERY_SEND_TASK_ERROR_EMAILS = False
CELERY_TASK_RESULT_EXPIRES = 600
更新:当我将CELERY_ALWAY_EAGER设置为True时,则该组使用命令运行良好
group(*chain_list)()
但它在本地运行,这当然不是我想要的。
在创建链后,确保在使用group时使用不可变或可变签名。例如:
res = group(job_chain.si(), job_chain.si())()