我是Python和Celery-Redis的新手,所以如果我的理解不正确,请纠正我。
我一直在调试一个代码库,其结构如下 -
TaskClass
-> 芹菜任务
HandlerClass1, HandlerClass2
-> 这些是扩展对象类的 python 类
该应用程序创建TaskClass
比如说dumyTask
实例,dumyTask
创建芹菜子任务(我相信这些子任务是唯一的),比如dumySubTask1, dumySubTask2
通过获取处理程序的签名。
我无法理解什么?
1)芹菜如何管理dumySubTask1, dumySubTask2
和dumyTask
的结果?我的意思是dumySubTask1
和dumySubTask2
的结果应该汇总并作为dumyTask
的结果给出。芹菜-雷迪斯是如何做到这一点的?
2)一旦任务被执行,芹菜如何在后端存储任务结果?我的意思是dumySubTask1
和dumySubTask2
的结果会存储在后端,然后将结果返回到dumyTask
然后dumyTask
结果返回给 QUEUE(如果我错了,请纠正)?
3) Celery 是否将任务和子任务维护为STACK?请参阅快照。任务-子任务树
任何指导都非常感谢。谢谢。
芹菜工人可以调用"任务"。这个"任务"可以有"子任务",这些子任务可以"链接"在一起,即按顺序调用。"链"是芹菜帆布指南中专门使用的术语。然后,结果将返回到 redis 中的队列。
芹菜工人用于调用主要用于"网络用例"的"独立任务",即"发送电子邮件","点击URL">
您需要使用
task = app_celery.AsyncResult(task_id)
完整示例 - 如下
我的celery_worker.py
文件是:
import os
import time
from celery import Celery
from dotenv import load_dotenv
load_dotenv(".env")
celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND")
@celery.task(name="create_task")
def create_task(a, b, c):
print(f"Executing create_task it will take {a}")
[print(i) for i in range(100)]
time.sleep(a)
return b + c
我正在使用 FastAPI,我的端点是:
# To execute the task
@app.get("/sum")
async def root(sleep_time: int, first_number: int, second_number: int):
process = create_task.delay(sleep_time, first_number, second_number)
return {"process_id": process.task_id, "result": process.result}
# To get the task status and result
from celery_worker import create_task, celery
@app.get("/task/{task_id}")
async def check_task_status(task_id: str):
task = celery.AsyncResult(task_id)
return {"status": task.status, "result": task.result}
我的.env
文件有:
CELERY_BROKER_URL=redis://redis:6379/0
CELERY_RESULT_BACKEND=redis://redis:6379/0