如何确定在芹菜任务的名称?



我有一个fastAPI应用程序,我想调用芹菜任务我不能导入任务,因为它们是在两个不同的代码库。所以我必须使用它的名字来调用它

intasks.py

imagery = Celery(
"imagery", broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL")
)
...
@imagery.task(bind=True, name="filter")
def filter_task(self, **kwargs) -> Dict[str, Any]:
print('running task')

celery worker正在运行如下命令:

celery worker -A worker.imagery -P threads --loglevel=INFO --queues=imagery

现在,在我的FastAPI代码库中,我想运行过滤器任务。所以我的理解是,我必须使用celery。send_task()函数

app.py中我有

from celery import Celery, states
from celery.execute import send_task
from fastapi import FastAPI
from starlette.responses import JSONResponse, PlainTextResponse
from app import models
app = FastAPI()
tasks = Celery(broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL"))

@app.post("/filter", status_code=201)
async def upload_images(data: models.FilterProductsModel):
"""
TODO: use a celery task(s) to query the database and upload the results to S3
"""
data = ['ok', 'un test']
data = ['ok', 'un test']
result = tasks.send_task('workers.imagery.filter', args=list(data))
return PlainTextResponse(f"here is the id: {str(result.ready())}")

调用/filter端点后,我没有看到工作器拾取任何任务。所以我在send_task()

中尝试了不同的名称<<ul>
  • 过滤器/gh>
  • imagery.filter
  • worker.imagery.filter
  • 为什么我的任务从来没有被工人捡起,没有显示在日志中?我的任务名错了吗?

    编辑:工作进程在docker中运行。这是文件在其磁盘上的完整路径。

    • tasks.py:/workers/worker.py

    如果我遵循导入模式。任务的名称应该是workers.worker.filter,但这不起作用,docker的日志中没有打印任何内容。是一个打印应该出现在STDOUT的芹菜cli?

    您的Celery worker只订阅了imagery队列。另一方面,您尝试使用result = tasks.send_task('workers.imagery.filter', args=list(data))将任务发送到默认队列(如果没有更改配置,则该队列的名称为celery)。你没有看到任务被你的worker执行,这并不奇怪,因为你一直在向默认队列发送任务。

    要解决此问题,请尝试以下操作:

    result = tasks.send_task('workers.imagery.filter', args=list(data), queue='imagery')
    

    OP Here

    这是我使用的解决方案。

    task = signature("filter", kwargs=data.dict() ,queue="imagery")
    res = task.delay()
    

    正如@DejanLekic提到的,我必须指定队列。

    相关内容

    最新更新