假设我在芹菜队列上的所有任务都命中了第三方API。但是,API 有一个速率限制,我正在跟踪它(我需要遵守一天限制和每小时限制(。一旦达到速率限制,我想暂停新任务的消耗,然后在我知道自己很好时继续。
我使用以下两个任务实现了此目的:
@celery.task()
def cancel_api_queue(minutes_to_resume):
resume_api_queue.apply_async(countdown=minutes_to_resume*60, queue='celery')
celery.control.cancel_consumer('third_party', reply=True)
@celery.task(default_retry_delay=300, max_retries=5)
def resume_api_queue():
celery.control.add_consumer('third_party', destination=['y@local'])
然后,我可以继续提交我的第三方 API 任务,一旦我的使用者被添加回来,我的所有任务都会被消耗掉。伟大。
但是,由于我在此队列中没有使用者,这似乎意味着我无法再看到在 Flower 中提交的作业(直到我的使用者被添加(。
我做错了什么吗?我是否可以以另一种方式实现这种"暂停",以便继续在花中看到提交的作业?
附言也许这与这个问题有关,但不是 100% 确定:https://github.com/celery/celery/issues/1452
我正在使用 amqp 代理,如果这有所作为。
谢谢女孩和男孩。
我怀疑在工作人员拿起队列消息之前偷看队列消息的内容并不是 Flower 预期设计的一部分。因此,如果您停止使用队列中的任务,Flower 能做的最好的事情就是在"代理"窗格中显示其中有多少任务已作为单个数字排队。
观察传入任务内部的一种黑客方法可能是添加一个中间虚拟"转发"任务,它只是将消息从一个队列(我们称之为query_inbox
(转发到另一个队列(例如,query_processing
(。
例如,像这样:
@celery.task(queue='query_inbox')
def query(params):
process_query.delay(params)
@celery.task(queue='query_processing')
def process_query(params):
... do rate-limited stuff ...
现在,您可以停止使用query_processing
的任务,但您仍然可以在它们流经query_inbox
工作线程时观察它们的参数。