我正试图弄清楚如何获取未确认消息的信息。这些东西存放在哪里?在使用芹菜检查时,似乎一旦消息得到确认,它就会处理通过,并且您可以跟踪状态。假设你有一个结果后端,那么你可以看到它的结果。但从你应用延迟到它被确认,它就在黑洞中。
- noAck存储在哪里
- 如何了解noAcks列表的"深度"?换句话说,有多少人,我的任务在列表中的哪里
虽然这与我正在处理的问题并不完全相关。
from celery.app import app_or_default
app = app_or_default()
inspect = app.control.inspect()
# Now if I want "RECEIVED" jobs..
data = inspect.reserved()
# or "ACTIVE" jobs..
data = inspect.active()
# or "REVOKED" jobs..
data = inspect.revoked()
# or scheduled jobs.. (Assuming these are time based??)
data = inspect.scheduled()
# FILL ME IN FOR UNACK JOBS!!
# data = inspect.??
# This will never work for tasks that aren't in one of the above buckets..
pprint.pprint(inspect.query_task([tasks]))
我真的很感谢你在这方面的建议和帮助。
它们是inspect.reserved()
中具有'acknowleged': False
的任务
from celery.app import app_or_default
app = app_or_default()
inspect = app.control.inspect()
# those that have been sent to a worker and are thus reserved
# from being sent to another worker, but may or may not be acknowledged as received by that worker
data = inspect.reserved()
{'celery.tasks': [{'acknowledged': False,
'args': '[]',
'delivery_info': {'exchange': 'tasks',
'priority': None,
'routing_key': 'celery'},
'hostname': 'celery.tasks',
'id': '527961d4-639f-4002-9dc6-7488dd8c8ad8',
'kwargs': '{}',
'name': 'globalapp.tasks.task_loop_tick',
'time_start': None,
'worker_pid': None},
{'acknowledged': False,
'args': '[]',
'delivery_info': {'exchange': 'tasks',
'priority': None,
'routing_key': 'celery'},
'hostname': 'celery.tasks',
'id': '09d5b726-269e-48d0-8b0e-86472d795906',
'kwargs': '{}',
'name': 'globalapp.tasks.task_loop_tick',
'time_start': None,
'worker_pid': None},
{'acknowledged': False,
'args': '[]',
'delivery_info': {'exchange': 'tasks',
'priority': None,
'routing_key': 'celery'},
'hostname': 'celery.tasks',
'id': 'de6d399e-1b37-455c-af63-a68078a9cf7c',
'kwargs': '{}',
'name': 'globalapp.tasks.task_loop_tick',
'time_start': None,
'worker_pid': None}],
'fastlane.tasks': [],
'images.tasks': [],
'mailer.tasks': []}
经过数小时对芹菜的回顾,我得出的结论是,使用纯芹菜是不可能的。然而,可以松散地跟踪整个过程。这是我用来查找未确认计数的代码。这其中的大部分都可以使用芹菜中的实用程序来完成。
我仍然无法按id查询底层未确认的任务,但
如果您安装了RabbitMQ管理插件,您可以查询API
data = {}
base_url = "http://localhost:55672"
url = base_url + "/api/queues/{}/".format(vhost)
req = requests.get(url, auth=(settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD))
if req.status_code != 200:
log.error(req.text)
else:
request_data = req.json()
for queue in request_data:
# TODO if we know what queue the task is then we can nail this.
if queue.get('name') == "celery":
data['state'] = "Unknown"
if queue.get('messages'):
data['messages'] = queue.get('messages')
data['messages_ready'] = queue.get('messages_ready')
data['messages_unacknowledged'] = queue.get('messages_unacknowledged')
break
return data