我试图写一个基本任务类,将检查工作人员的可用性之前,它做一些事情。通过阅读这篇文章,我得出了以下结论:
class BaseTask(Task):
def apply_async(self, *args, **kwargs):
if not celery.control.inspect().stats():
raise Exception("workersDown")
Task.apply_async(self, *args, **kwargs)
然而,这似乎只在第一次起作用。我知道任务不是每次实例化,但它是相关的?还有别的方法可以达到我的目的吗?
编辑:我发现将基本任务设置为抽象有帮助,但仍然会产生一些误报(有时异常会引发,尽管工人是向上的):
class AnotherTask(Task):
abstract = True
def apply_async(self, *args, **kwargs):
if not celery.control.inspect().stats():
raise Exception("workersDown")
Task.apply_async(self, *args, **kwargs)
对于给定的芹菜应用实例,只能调用一次inspect()
。下面是您可以在任务中使用的小代码片段:
from celery import Celery
def inspect(method):
app = Celery('app', broker='amqp://')
return getattr(app.control.inspect(), method)()
class BaseTask(Task):
def apply_async(self, *args, **kwargs):
if not inspect('stats'):
raise Exception("workersDown")
Task.apply_async(self, *args, **kwargs)