有一个跟踪器类,它只通过redis计算success
、failed
、pending
和started
任务。
目标是扩展Celery,使其工作人员可以访问group_id
并保留组的统计信息。我希望有一个类似的界面:
def on_important_event(...):
group_id=uuid4()
for _ in range(count_of_jobs):
my_task.apply_async(..., group_id=group_id)
自定义任务类看起来像:
class MyTask(Task):
# declaring group_id somehow
def apply_async(...):
get_tracker(self.request.group_id).task_pending()
...
def before_start(...):
get_tracker(self.request.group_id).task_started()
...
def on_success(...):
get_tracker(self.request.group_id).task_success()
...
def on_failure(...):
get_tracker(self.request.group_id).task_failed()
...
我找不到实现该类的方法,因此它将通过AMQP
正确地保存和接收自定义属性。
UPD,以明确:
问题是将Tasks的一些调用标记为组的参与者。所以我可以跟踪小组而不是一般的任务或单个呼叫。
在我看来,必须有一种方法为Task添加一个属性,该属性将保存到Queue中,然后由Celery的工作人员接收,这样我就可以在Task类层上访问它。
我推荐一种不同的方法-编写一个自定义监视器(查看官方Celery文档中的Monitoring API文档(。一个好的起点:实时处理。
这基本上就是Flower和Leek的工作方式。