我们在烧瓶中使用芹菜。我们的各种任务有很多队列,我们使用 supervisord 来运行队列。我们使用 cloudamqp代理。
管理引擎配置示例如下所示:
[program:my-queue]
command=/home/ubuntu/opt/proect/venv/bin/celery -A async_runner worker -Q my_queue --loglevel=INFO --without-gossip --without-mingle --autoscale=1,1 -c 1
environment=PYTHONPATH=/home/ubuntu/opt/project/,PRODUCTION_ENVIRONMENT=true
directory=/home/ubuntu/opt/project/app
process_name = %(program_name)s_%(process_num)02d
user=ubuntu
numprocs=2
autostart=true
autorestart=true
startsecs=10
stopwaitsecs = 600
priority=998
我们收到以下错误,队列停止。
[2017-04-06 12:43:06,759: WARNING/MainProcess] /home/ubuntu/opt/project/venv/local/lib/python2.7/site-packages/kombu/pidbox.py:75: UserWarning: A node named celery@ip-xxx-yy-yy-yy is already using this process mailbox!
Maybe you forgot to shutdown the other node or did not do so properly?
Or if you meant to start multiple nodes on the same host please make sure
you give each node a unique node name!
warnings.warn(W_PIDBOX_IN_USE.format(node=self))
问题:如何为每个节点命名?
当我运行celery -A async_runner status
时,它会给出 5 个节点的在线消息。
celery@ip-ip_value_here-: OK
celery@ip-ip_value_here-: OK
celery@ip-ip_value_here-: OK
celery@ip-ip_value_here-: OK
celery@ip-ip_value_here-: OK
5 nodes online.
以下是我们如何为公司中的每个队列运行一个芹菜工人。入口点包含以下内容:
echo QUEUES: ${QUEUES} # comma separated list of queue names
export NUM_QUEUES=$(python -c "print len('$QUEUES'.split(','))")
echo NUM_QUEUES: ${NUM_QUEUES}
supervisord &
主管配置文件具有以下内容:
[program:worker]
command=/path/to/celery_worker.sh %(process_num)s
# supervisor uses the special ENV_ prefix to get the environment variable set above
numprocs=%(ENV_NUM_QUEUES)s
最后celery_worker.sh
包含以下内容:
QUEUE=$(python -c "print '$QUEUES'.split(',')[$1]") # $1 = process_num above
celery worker -n "worker.${QUEUE}" -Q ${QUEUE} # -n sets the name of the worker
使用选项 -n SOMENODENAME@%%h
。@%%h
将确保每个工作线程都有一个唯一的节点名称