I有1名芹菜经纪人和几名芹菜工人,他们都在与rabbitMQ沟通。在我的设置中,我向芹菜工人发送了几个任务,他们会处理所有任务(大约需要1个小时(,然后我会手动终止我的芹菜工人。
我想转向这样一个系统,在该系统中,如果一个芹菜工作者id为"idle"(我将其定义为:在timeout_seconds
的时间段内有0个活动任务,我将预先定义(,该工作者将被程序终止。所有工人将有大约相同数量的任务要运行,并且都将在同一时间"空闲"。
我设置了代码,可以终止工作人员,但我不知道如何检测工作人员"空闲"并准备终止。我想我想使用一个信号,但看起来没有一个符合我的要求
在我工作的地方,我们有一项任务,基本上是做你想做的事情-根据"情况";。这个过程的关键是Celery检查/控制API,所以我建议你熟悉它。这是一个没有很好记录的领域,所以从以下开始:
insp = celery_app.control.inspect()
active_queues = insp.active_queues()
# Note: between these two calls some nodes may shut down and disappear
# from the dictionary so may need to deal with this...
active_stats = insp.active()
当您的Celery集群运行任务时,您可以在单独的IPython会话中执行此操作,并查看其中的内容。。。