芹菜排着不同的队



我有一个项目,我正在使用一个文件(python-main.py(启动我的FastAPI:

import uvicorn
from configuration import API_HOST, API_PORT
if __name__ == "__main__":
uvicorn.run("endpoints:app", host="localhost", port=8811, reload=True, access_log=False)

内部端点.py我有:

from celery import Celery
from fastapi import FastAPI
import os
import time
# Create object for fastAPI
app = FastAPI(
title="MYFASTAPI",
description="MYDESCRIPTION",
version=1.0,
contact="ME!",
)
celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379")
celery.conf.task_track_started = True
celery.conf.task_serializer = pickle
celery.conf.result_serializer = pickle
celery.conf.accept_content = ["pickle"]
# By defaul celery can handle as many threads as CPU cores have the instance. 
celery.conf.worker_concurrency = os.cpu_count()
# Start the celery worker. I start it in a separate thread, so fastapi can run in parallel
worker = celery.Worker()
def start_worker():
worker.start()
ce = threading.Thread(target=start_worker)
ce.start()
@app.post("/taskA")
def taskA():
task = ask_taskA.delay()
return {"task_id": task.id}
@celery.task(name="ask_taskA", bind=True)
def ask_taskA(self):
time.sleep(100)
@app.post("/get_results")
def get_results(task_id):
task_result = celery.AsyncResult(task_id)
return {'task_status': task_result.status}

给定此代码,我如何拥有两个不同的队列,为每个搜索队列分配特定数量的工作人员,并为其中一个队列分配特定任务?

我读到人们用来执行芹菜作为:

celery -A proj worker

但是这个项目中有一个结构,由于一些导入,限制了我,最后我用另一个线程启动了芹菜工人(这非常有效(

基于官方的芹菜文档https://docs.celeryq.dev/en/stable/userguide/routing.html#manual-路由[1]您可以按照此操作指定不同的队列。

from kombu import Queue
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('default',    routing_key='task.#'),
Queue('feed_tasks', routing_key='feed.#'),
)
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_exchange_type = 'topic'
app.conf.task_default_routing_key = 'task.default'

相关内容

  • 没有找到相关文章

最新更新