Celery/RabbitMQ-查找无确认-未确认消息



我正试图弄清楚如何获取未确认消息的信息。这些东西存放在哪里?在使用芹菜检查时,似乎一旦消息得到确认,它就会处理通过,并且您可以跟踪状态。假设你有一个结果后端,那么你可以看到它的结果。但从你应用延迟到它被确认,它就在黑洞中。

  1. noAck存储在哪里
  2. 如何了解noAcks列表的"深度"?换句话说,有多少人,我的任务在列表中的哪里

虽然这与我正在处理的问题并不完全相关。

from celery.app import app_or_default
app = app_or_default()
inspect = app.control.inspect()
# Now if I want "RECEIVED" jobs.. 
data = inspect.reserved()
# or "ACTIVE" jobs.. 
data = inspect.active()
# or "REVOKED" jobs.. 
data = inspect.revoked()
# or scheduled jobs.. (Assuming these are time based??)
data = inspect.scheduled()
# FILL ME IN FOR UNACK JOBS!!
# data = inspect.??
# This will never work for tasks that aren't in one of the above buckets..
pprint.pprint(inspect.query_task([tasks]))

我真的很感谢你在这方面的建议和帮助。

它们是inspect.reserved()中具有'acknowleged': False 的任务

from celery.app import app_or_default
app = app_or_default()
inspect = app.control.inspect()
# those that have been sent to a worker and are thus reserved
# from being sent to another worker, but may or may not be acknowledged as received by that worker
data = inspect.reserved()
{'celery.tasks': [{'acknowledged': False,
               'args': '[]',
               'delivery_info': {'exchange': 'tasks',
                                 'priority': None,
                                 'routing_key': 'celery'},
               'hostname': 'celery.tasks',
               'id': '527961d4-639f-4002-9dc6-7488dd8c8ad8',
               'kwargs': '{}',
               'name': 'globalapp.tasks.task_loop_tick',
               'time_start': None,
               'worker_pid': None},
              {'acknowledged': False,
               'args': '[]',
               'delivery_info': {'exchange': 'tasks',
                                 'priority': None,
                                 'routing_key': 'celery'},
               'hostname': 'celery.tasks',
               'id': '09d5b726-269e-48d0-8b0e-86472d795906',
               'kwargs': '{}',
               'name': 'globalapp.tasks.task_loop_tick',
               'time_start': None,
               'worker_pid': None},
              {'acknowledged': False,
               'args': '[]',
               'delivery_info': {'exchange': 'tasks',
                                 'priority': None,
                                 'routing_key': 'celery'},
               'hostname': 'celery.tasks',
               'id': 'de6d399e-1b37-455c-af63-a68078a9cf7c',
               'kwargs': '{}',
               'name': 'globalapp.tasks.task_loop_tick',
               'time_start': None,
               'worker_pid': None}],
 'fastlane.tasks': [],
 'images.tasks': [],
 'mailer.tasks': []}

经过数小时对芹菜的回顾,我得出的结论是,使用纯芹菜是不可能的。然而,可以松散地跟踪整个过程。这是我用来查找未确认计数的代码。这其中的大部分都可以使用芹菜中的实用程序来完成。

我仍然无法按id查询底层未确认的任务,但

如果您安装了RabbitMQ管理插件,您可以查询API

    data = {}
    base_url = "http://localhost:55672"
    url = base_url + "/api/queues/{}/".format(vhost)
    req = requests.get(url, auth=(settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD))
    if req.status_code != 200:
        log.error(req.text)
    else:
        request_data = req.json()
        for queue in request_data:
            # TODO if we know what queue the task is then we can nail this.
            if queue.get('name') == "celery":
                data['state'] = "Unknown"
                if queue.get('messages'):
                    data['messages'] = queue.get('messages')
                    data['messages_ready'] = queue.get('messages_ready')
                    data['messages_unacknowledged'] = queue.get('messages_unacknowledged')
                break
    return data 

相关内容

  • 没有找到相关文章

最新更新