我正试图找到一种可靠且可测试的方法来获取给定Celery队列中包含的任务数,这让我失去了理智。
我已经阅读了这两个相关的讨论:
-
Django Celery获取任务计数
注意:我没有使用Django,也没有使用任何其他Python web框架
-
检索Celery 中队列中的任务列表
但我无法使用这些线程中描述的方法解决我的问题。
我使用Redis作为后端,但我希望有一个独立于后端且灵活的解决方案,尤其是对于测试。
这是我目前的情况:我定义了一个EnhancedCelery
类,它继承了Celery
,并添加了几个方法,特别是get_queue_size()
是我正在尝试正确实现/测试的方法。
以下是我的测试用例中的代码:
celery_test_app = EnhancedCelery(__name__)
# this is needed to avoid exception for ping command
# which is automatically triggered by the worker once started
celery_test_app.loader.import_module('celery.contrib.testing.tasks')
# in memory backend
celery_test_app.conf.broker_url = 'memory://'
celery_test_app.conf.result_backend = 'cache+memory://'
# We have to setup queues manually,
# since it seems that auto queue creation doesn't work in tests :(
celery_test_app.conf.task_create_missing_queues = False
celery_test_app.conf.task_default_queue = 'default'
celery_test_app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('queue_1', routing_key='q1'),
Queue('queue_2', routing_key='q2'),
Queue('queue_3', routing_key='q3'),
)
celery_test_app.conf.task_default_exchange = 'tasks'
celery_test_app.conf.task_default_exchange_type = 'topic'
celery_test_app.conf.task_default_routing_key = 'task.default'
celery_test_app.conf.task_routes = {
'sample_task': {
'queue': 'default',
'routing_key': 'task.default',
},
'sample_task_in_queue_1': {
'queue': 'queue_1',
'routing_key': 'q1',
},
'sample_task_in_queue_2': {
'queue': 'queue_2',
'routing_key': 'q2',
},
'sample_task_in_queue_3': {
'queue': 'queue_3',
'routing_key': 'q3',
},
}
@celery_test_app.task()
def sample_task():
return 'sample_task_result'
@celery_test_app.task(queue='queue_1')
def sample_task_in_queue_1():
return 'sample_task_in_queue_1_result'
@celery_test_app.task(queue='queue_2')
def sample_task_in_queue_2():
return 'sample_task_in_queue_2_result'
@celery_test_app.task(queue='queue_3')
def sample_task_in_queue_3():
return 'sample_task_in_queue_3_result'
class EnhancedCeleryTest(TestCase):
def test_get_queue_size_returns_expected_value(self):
def add_task(task):
task.apply_async()
with start_worker(celery_test_app):
for _ in range(7):
add_task(sample_task_in_queue_1)
for _ in range(4):
add_task(sample_task_in_queue_2)
for _ in range(2):
add_task(sample_task_in_queue_3)
self.assertEqual(celery_test_app.get_queue_size('queue_1'), 7)
self.assertEqual(celery_test_app.get_queue_size('queue_2'), 4)
self.assertEqual(celery_test_app.get_queue_size('queue_3'), 2)
以下是我实现get_queue_size()
:的尝试
这总是返回零(
jobs == 0
(:def get_queue_size(self, queue_name: str) -> Optional[int]: with self.connection_or_acquire() as connection: channel = connection.default_channel try: name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True) return jobs except (ChannelError, NotFound): pass
这也总是返回零:
def get_queue_size(self, queue_name: str) -> Optional[int]: inspection = self.control.inspect() return inspection.active() # zero! # or: return inspection.scheduled() # zero! # or: return inspection.reserved() # zero!
这是通过返回每个队列的预期数量来实现的,但仅在测试环境中,因为使用redis后端时不存在
channel.queues
属性:def get_queue_size(self, queue_name: str) -> Optional[int]: with self.connection_or_acquire() as connection: channel = connection.default_channel if hasattr(channel, 'queues'): queue = channel.queues.get(queue_name) if queue is not None: return queue.unfinished_tasks
在我看来,你提到的解决方案都不完全正确。正如您已经提到的,这是特定于后端的,因此您必须包装Celery支持的所有后端的处理程序,以提供后端不可知的队列检查。在Redis的情况下,您必须直接连接到Redis和LLEN您要检查的队列。在RabbitMQ的情况下,您可以以完全不同的方式找到这些信息。与SQS的故事相同。。。
这一切都在Celery线程中的检索队列中的任务列表中进行了讨论。。。
最后,Celery没有提供开箱即用的功能是有原因的——我认为这些信息毫无用处。当你得到队列中的东西时,它可能已经是空的了!
如果你想监控你的排队情况,我建议你采取另一种方法。-编写自己的实时监视器。该示例仅捕获任务失败事件,但您应该能够轻松地对其进行修改,以捕获您关心的所有事件,并收集有关这些任务的数据(队列、时间、执行任务的主机等(。显然,这是一个在更严肃的项目中如何做到这一点的例子。
您可以在Flower(Celery的实时监视器(中看到它是如何实现的。
另一种方法-使用芹菜的任务事件:计算发送了多少任务以及成功/失败了多少任务