我目前遇到了一个使用芹菜的项目的问题。当将任务路由到队列并且当任务数量超过并发性时,工作线程将脱机。在此状态下,它仍在处理任务,但工作者看起来脱机并且无法检查。ping工作线程或执行检查active_queues会给出"错误:没有节点在时间限制内回复"的消息。当任务数等于或小于该工作线程的并发数时,该工作线程将重新上线。
使用'-l debug'运行worker没有显示任何错误,因为worker脱机了。尽管无法检查,但该工作人员似乎离线时,日志仍在继续。
芹菜是3.1.11版本,代理是RabbitMQ 3.3.0。
我的芹菜配置:
from celery.schedules import crontab
from kombu import Exchange, Queue
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
Queue('manager_tasks',
exchange=Exchange('manager', type='direct'),
routing_key='manager.#'),
Queue('high_priority_tasks',
exchange=Exchange('high', type='direct'),
routing_key='high.#'),
Queue('low_priority_tasks',
exchange=Exchange('low', type='direct'),
routing_key='low.#'),
)
CELERY_DEFAULT_EXCHANGE = 'tasks'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'task.default'
BROKER_URL = 'amqp://[user]:[password]@[url]:[port]/[vhost]'
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
CELERYBEAT_SCHEDULE = {
'scan-hosts': {
'task': 'myapp.task',
'schedule': crontab(minute='*/1',
hour='8-17',
day_of_week='mon-fri'),
'options': {
'queue': 'manager_tasks',
'routing_key': 'manager.#'
},
},
}
CELERY_TASK_RESULT_EXPIRES = 18000 # 5 hours
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
从我发现的情况来看,worker似乎脱机了,这是一种表示它已满并且无法预取任何更多任务的信号。一旦它能够接收更多的任务,它就会恢复,一旦满足并发性和预取限制,它就会回到"离线"状态。