我正在我的django
应用程序中启动chord
,以响应请求。 和弦正确执行,但 django 永远不会释放 pub-sub 通道。 杀死 django 服务器会释放通道,然后它从redis-cli pubsub channels
中消失。
- 芹菜 4.1.1 或 4.2.0rc4
- 雷迪斯 4.0.9
- 蟒蛇 2.7.15
- 本地运行,1 个芹菜工人,1 个 API 服务器
- 在这种情况下,结果并不重要(但文档说不要忽略它们(
- 完整示例项目位于: https://github.com/awbacker/celerychord-issue
点击/api/start/
并在运行芹菜的选项卡中查看任务完成后,我看到还剩 5 个频道。 杀死姜戈会移除通道,杀死芹菜工人对它们没有影响。
redis-cli pubsub channels
1) "celery-task-meta-chord-lphsmq-chunk-4-14"
2) "celery-task-meta-chord-lphsmq-chunk-2-12"
3) "celery-task-meta-chord-lphsmq-chunk-3-13"
4) "celery-task-meta-chord-lphsmq-chunk-1-11"
5) "celery-task-meta-chord-lphsmq-chunk-0-10"
我看到当一切正常时,通道仍然存在,因此不会抛出任何错误。
谁能看到我做错了什么? 我知道芹菜中报告了一些问题,但我不确定这是否源于它们:
- https://github.com/celery/celery/issues/3812
- https://github.com/celery/celery/issues/4761
法典:
# --- endpoint.py -------------------------------------------
chord_key = get_random_string(6, string.ascii_lowercase)
all_tasks = celery.chord(
task_id="chord-%s" % chord_key,
header=celery.group(
tasks.process_chunk.subtask(args=(x,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
for i, x in enumerate(range(10, 15))
),
# immutable = ignore results from parent
body=celery.chain(
tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
)
)
result = all_tasks.apply_async()
return Response(data=dict(chord_key=chord_key, result=repr(result)))
# --- tasks.py ----------------------------------------------
@celery_app.task(bind=True, ignore_result=False)
def process_chunk(self, x):
logging.error(" ~ executing process-chunk: %s" % x)
return x * 2
@celery_app.task(bind=True, ignore_result=False)
def post_step_1(self, y):
logging.error(" ~ executing post-step-1")
return y * 3
@celery_app.task(bind=True, ignore_result=False)
def post_step_2(self, z):
logging.error(" ~ executing post-step-2")
return z * 5
你的和弦看起来很复杂,也许这就是芹菜很难过的原因,我建议你自己实现和弦逻辑,它不是很复杂。 试试这个...我基本上是在等待使用和弦机制的任务
# --- endpoint.py -------------------------------------------
chain_tasks = celery.chain(
tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True)
.apply_async()
chain_result= chain_tasks.get() // WAIT TO FINISH
group_task = celery.group(tasks.process_chunk.subtask(args=(chain_result,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
for i, x in enumerate(range(10, 15)).apply_async()
group_result = group_task.get()
return Response(data=dict(chord_key=chord_key, result=repr(group_result)))
不确定这是否正是您要实现的目标,但我认为只需进行一些调整即可。 祝你好运。