我试图为链中的每个任务使用不同的队列。我使用下面的代码,但使用者从传入队列中获取所有消息,并将所有消息都放在队列saveRequestQueue
中,但这些消息不会被进一步处理。
chain = (
tasks.save_request_task.s(transient_schema.dict()).set(queue="saveRequestQueue")
| tasks.get_customer_summary_task.s().set(queue="getCustomerSummaryQueue")
| tasks.save_analysis_task.s().set(queue="saveAnalysisQueue")
| tasks.answer_to_credit_engine_task.s().set(queue="answerToCreditEngineTask")
)
chain()
我曾尝试在.set()
方法上使用相同的队列,它的效果非常好。但我确实需要为每个任务设置不同的队列。
所有消息都被卡在第一个队列中的原因有什么想法吗?
我在15分钟前测试了这个片段,它似乎运行得很好:
fetch_init_t = fetch_init_data.s(args).set(queue="updates")
fetch_next_t = get_next.s(next_request_args).set(queue="next")
tasks.append(chain(fetch_init_t, fetch_next_t).delay())
我将celery==5.3.1
和redis:alpine-latest
用于结果和传输。