所以我们的用例可能超出了Celery的权限,但我想我应该问。。。
用例
我们计划使用托管/托管的RabbitMQ集群支持,Celery将使用该集群作为其代理。我们希望确保我们的应用程序没有停机时间(显然),因此我们正试图弄清楚当我们的上游集群发生灾难性故障,导致整个集群不可用时,我们如何处理该事件。
我们的想法是,我们有一个备用的Rabbit集群,当连接断开时,我们可以自动切换Celery来使用该连接。
与此同时,Celery正在确定主集群是否启动并运行,当启动并运行时,所有发布者都会重新连接到主集群,工作人员会耗尽备份集群,当空闲时,切换回主集群。
问题
我遇到的困难是捕捉连接故障,因为它似乎发生在芹菜的深处,因为应用程序中没有出现异常。
我可以看到Celery有一个BROKER_FAILOVER_STRATEGY
配置属性,它将处理初始交换,但它(似乎)只在发生故障切换时使用,这不适合我们在主设备备份时交换回主设备的用例。
我也遇到过Celery的"引导步骤",但这些都是在Celery自己的"连接"引导步骤之后应用的,也就是抛出异常的地方。
考虑到我所发现的局限性,我有一种感觉,这种方法可能不是最好的方法,但有人知道我该如何覆盖默认的连接引导步骤或通过不同的方式实现这一点吗?
它很旧,但可能对某些人有用。我正在使用Celery 5.2的FastApi。
run_api.py文件:
import uvicorn
if __name__ == "__main__":
port=8893
print("Starting API server on port {}".format(port))
uvicorn.run("endpoints:app", host="localhost", port=port, access_log=False)
endpoints.py文件:
import threading
import time
import os
from celery import Celery
from fastapi import FastAPI
import itertools
import random
# Create object for fastAPI
app = FastAPI()
# Create and onfigure Celery to manage queus
# ----
celery = Celery(__name__)
celery.conf.broker_url = ["redis://localhost:6379"]
celery.conf.result_backend = "redis://localhost:6379"
celery.conf.task_track_started = True
celery.conf.task_serializer = "pickle"
celery.conf.result_serializer = "pickle"
celery.conf.accept_content = ["pickle"]
def random_failover_strategy(servers):
# The next line is necessary to work, even you don't use them:
it = list(servers) # don't modify callers list
shuffle = random.shuffle
for _ in itertools.repeat(None):
# Do whatever action required here to obtain the new url
# As an example, ra.
ra = random.randint(0, 100)
it = [f"redis://localhost:{str(ra)}"]
celery.conf.result_backend = it[0]
shuffle(it)
yield it[0]
celery.conf.broker_failover_strategy = random_failover_strategy
# Start the celery worker. I start it in a separate thread, so fastapi can run in parallel
worker = celery.Worker()
def start_worker():
worker.start()
ce = threading.Thread(target=start_worker)
ce.start()
# ----
@app.get("/", tags=["root"])
def root():
return {"message": ""}
@app.post("/test")
def test(num: int):
task = test_celery.delay(num)
print(f'task id: {task.id}')
return {
"task_id": task.id,
"task_status": "PENDING"}
@celery.task(name="test_celery", bind=True)
def test_celery(self, num):
self.update_state(state='PROGRESS')
print("ENTERED PROCESS", num)
time.sleep(100)
print("EXITING PROCESS", num)
return {'number': num}
@app.get("/result")
def result(id: str):
task_result = celery.AsyncResult(id)
if task_result.status == "SUCCESS":
return {
"task_status": task_result.status,
"task_num": task_result.result['number']
}
else:
return {
"task_status": task_result.status,
"task_num": None
}
将两个文件放在同一文件夹中。运行python3 Run_api.py.
享受吧!