我已经使用芹菜一段时间了,在生产中,我使用RabbitMQ作为代理,在K8s集群中使用Redis作为后端,到目前为止还没有问题。在本地,我用几个服务(Flask API, 2个不同的worker, Beat, Redis, Flower, Hasura)运行一个docker组合,使用Redis作为Broker和Backend。
在过去的几个月里,我没有遇到这种设置的问题,但是昨天我在访问任务结果时开始出现不稳定的行为。
任务被发送到队列,工作器识别它并执行任务,但在查询任务状态时,我有时会得到DisabledBackend
。通常在第一个请求时,然后它就工作了。找不到它什么时候起作用,什么时候不起作用的模式,它是不稳定的。
我在某个地方读到芹菜没有很好地与flask的内置服务器一起工作,所以我切换到uWSGI与我在生产中几乎相同的设置:
[uwsgi]
wsgi-file = app/uwsgi.py
callable = application
http = :8080
processes = 4
threads = 2
master = true
chmod-socket = 660
vacuum = true
die-on-term = true
buffer-size = 32768
enable-threads = true
req-logger = python:uwsgi
我在Django中看到过类似的问题,其中问题似乎是在Apache的WSGI Mod上,这不是我的情况,但行为似乎相似。我看到的其他问题都与后端配置错误有关,但我的情况并非如此。
你知道是什么原因导致的吗?谢谢。
所以似乎我需要访问AsyncResult
仅通过我的芹菜应用程序实例,而不是通过芹菜,或传递芹菜应用程序实例作为参数。
所以,这不起作用:
from celery.result import AsyncResult
@app.route('/status/<task_id>')
def get_status(task_id):
task = AsyncResult(task_id)
return task.state
如此:
from app import my_celery # Your own Celery Application Instance
@app.route('/status/<task_id>')
def get_status(task_id):
task = my_celery.AsyncResult(task_id)
return task.state
from app import my_celery
from celery.result import AsyncResult
@app.route('/status/<task_id>')
def get_status(task_id):
task = AsyncResult(task_id, app=my_celery)
return task.state
我猜发生的事情是通过直接从芹菜调用AsyncResult
,它不访问芹菜的配置,因此它认为没有后端配置来查询结果。
但这只能解释函数的完全失效,而不能解释不稳定的行为。我猜这是因为不同的线程,在其中的应用程序实例是重要的情况下,所以芹菜发现它,不太确定,虽然。
我已经运行了几个测试,并且在更改导入的AsyncResult
后似乎再次工作良好,但我会继续挖掘。