芹菜店如何任务结果在 Redis 中



我是Python和Celery-Redis的新手,所以如果我的理解不正确,请纠正我。

我一直在调试一个代码库,其结构如下 -

TaskClass-> 芹菜任务

HandlerClass1, HandlerClass2-> 这些是扩展对象类的 python 类

该应用程序创建TaskClass比如说dumyTask实例,dumyTask创建芹菜子任务(我相信这些子任务是唯一的),比如dumySubTask1, dumySubTask2通过获取处理程序的签名。

我无法理解什么?

1)芹菜如何管理dumySubTask1, dumySubTask2dumyTask的结果?我的意思是dumySubTask1dumySubTask2的结果应该汇总并作为dumyTask的结果给出。芹菜-雷迪斯是如何做到这一点的?

2)一旦任务被执行,芹菜如何在后端存储任务结果?我的意思是dumySubTask1dumySubTask2的结果会存储在后端,然后将结果返回到dumyTask然后dumyTask结果返回给 QUEUE(如果我错了,请纠正)

3) Celery 是否将任务和子任务维护为STACK?请参阅快照。任务-子任务树

任何指导都非常感谢。谢谢。

芹菜工人可以调用"任务"。这个"任务"可以有"子任务",这些子任务可以"链接"在一起,即按顺序调用。"链"是芹菜帆布指南中专门使用的术语。然后,结果将返回到 redis 中的队列。

芹菜工人用于调用主要用于"网络用例"的"独立任务",即"发送电子邮件","点击URL">

您需要使用

task = app_celery.AsyncResult(task_id)

完整示例 - 如下

我的celery_worker.py文件是:

import os
import time
from celery import Celery
from dotenv import load_dotenv
load_dotenv(".env")
celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND")

@celery.task(name="create_task")
def create_task(a, b, c):
print(f"Executing create_task it will take {a}")
[print(i) for i in range(100)]
time.sleep(a)
return b + c

我正在使用 FastAPI,我的端点是:

# To execute the task
@app.get("/sum")
async def root(sleep_time: int, first_number: int, second_number: int):
process = create_task.delay(sleep_time, first_number, second_number)
return {"process_id": process.task_id, "result": process.result}

# To get the task status and result
from celery_worker import create_task, celery
@app.get("/task/{task_id}")
async def check_task_status(task_id: str):
task = celery.AsyncResult(task_id)
return {"status": task.status, "result": task.result}

我的.env文件有:

CELERY_BROKER_URL=redis://redis:6379/0
CELERY_RESULT_BACKEND=redis://redis:6379/0

相关内容

  • 没有找到相关文章

最新更新