我在supervisorord中设置了一个环境变量:
[program:worker]
directory = /srv/app/
command=celery -A tasks worker -Q default -l info -n default_worker.%%h
environment=BROKER="amqp://admin:password@xxxxx:5672//"
然后,在我的celeryconfig.py中,我尝试读取这样的变量。
BROKER = os.environ['BROKER']
但是我仍然得到了下面的错误,为什么?
File "/usr/local/lib/python2.7/dist-packages/celery/loaders/base.py", line 106, in import_module
return importlib.import_module(module, package=package)
File "/usr/lib/python2.7/importlib/__init__.py", line 37, in import_module
__import__(name)
File "/srv/app/celeryconfig.py", line 6, in <module>
BROKER = os.environ['BROKER']
File "/usr/lib/python2.7/UserDict.py", line 23, in __getitem__
raise KeyError(key)
KeyError: 'BROKER
正如评论中所建议的,有一个envs的文件转储:
{
'SUPERVISOR_GROUP_NAME': 'celery_default_worker',
'TERM': 'linux',
'SUPERVISOR_SERVER_URL': 'unix: ///var/run/supervisor.sock',
'UPSTART_INSTANCE': '',
'RUNLEVEL': '2',
'UPSTART_EVENTS': 'runlevel',
'PREVLEVEL': 'N',
'SUPERVISOR_PROCESS_NAME': 'celery_default_worker',
'UPSTART_JOB': 'rc',
'PWD': '/',
'SUPERVISOR_ENABLED': '1',
'runlevel': '2',
'PATH': '/usr/local/sbin: /usr/local/bin: /sbin: /bin: /usr/sbin: /usr/bin',
'previous': 'N'
}
它看起来像是supervisord
:中的一个已知错误
http://github.com/Supervisor/supervisor/issues/91
http://github.com/Supervisor/supervisor/pull/550(待定)
在这种情况下,将您的环境规范移动到全局范围(对于supervisord进程本身)可能是一个可以接受的解决方法。
最后,如果所有其他操作都失败了,请将celery
封装在接受此特定环境变量作为命令行参数的shell脚本中。
这个答案很可能不是原因,请检查https://stackoverflow.com/a/28829162/1589147以获取有关相关supervisord错误的信息
我可以部分重现你的错误。当芹菜在主管范围内运行时,我没有看到错误。当我尝试从没有设置BROKER
环境变量的外部主管环境运行任务时,我看到了错误。CCD_ 4由芹菜和任何试图执行任务的东西执行。
我不确定这个问题是否正是你遇到的问题,如果你能分享你是如何执行任务的,以及当出现异常时,它可能会有所帮助。
例如,如果我尝试从ipython
运行任务,则会生成与您的错误相匹配的错误。
In [1]: from tasks import add
In [2]: add.delay(2,3)
...
21 if hasattr(self.__class__, "__missing__"):
22 return self.__class__.__missing__(self, key)
---> 23 raise KeyError(key)
24 def __setitem__(self, key, item): self.data[key] = item
25 def __delitem__(self, key): del self.data[key]
KeyError: 'BROKER'
celeryconfig.py
是在本地加载的,以便建立到芹菜代理和后端的连接。如果不设置BROKER
环境变量,我将无法执行任务。
如果我在执行任务之前设置了环境变量,那么同样的代码也适用于我
In [3]: import os
In [4]: os.environ["BROKER"] = "broker is set"
In [5]: add.delay(2,3)
Out[5]: <AsyncResult: 0f3xxxx-87fa-48d7-9258-173bdd2052ca>
以下是我使用的文件,以防有帮助。
supervisor.conf
:supervisord -c supervisor.conf
[unix_http_server]
file=/tmp/supervisor.sock
[supervisord]
loglevel = info
nodaemon = true
identifier = supervisor
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[program:worker]
command=/app/srv/main-env/bin/celery -A tasks worker -Q default -l info -n default_worker.%%h
environment=BROKER="amqp://admin:password@xxxxx:5672//"
directory=/app/srv/
numprocs=1
stdout_logfile=/app/srv/worker.log
stderr_logfile=/app/srv/worker.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs = 600
killasgroup=true
priority=998
celeryconfig.py
:
import os
BROKER = os.environ['BROKER']
tasks.py
:
from celery import Celery
app = Celery(
'tasks',
backend='amqp',
broker='amqp://admin:password@xxxxx:5672//')
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
return x + y