Celery:与Queue共享的自定义任务/请求属性



有一个跟踪器类,它只通过redis计算successfailedpendingstarted任务。

目标是扩展Celery,使其工作人员可以访问group_id并保留组的统计信息。我希望有一个类似的界面:

def on_important_event(...):
group_id=uuid4()
for _ in range(count_of_jobs):
my_task.apply_async(..., group_id=group_id)

自定义任务类看起来像:

class MyTask(Task):
# declaring group_id somehow
def apply_async(...):
get_tracker(self.request.group_id).task_pending()
...
def before_start(...):
get_tracker(self.request.group_id).task_started()
...
def on_success(...):
get_tracker(self.request.group_id).task_success()
...
def on_failure(...):
get_tracker(self.request.group_id).task_failed()
...

我找不到实现该类的方法,因此它将通过AMQP正确地保存和接收自定义属性。


UPD,以明确:

问题是将Tasks的一些调用标记为组的参与者。所以我可以跟踪小组而不是一般的任务或单个呼叫。

在我看来,必须有一种方法为Task添加一个属性,该属性将保存到Queue中,然后由Celery的工作人员接收,这样我就可以在Task类层上访问它。

我推荐一种不同的方法-编写一个自定义监视器(查看官方Celery文档中的Monitoring API文档(。一个好的起点:实时处理。

这基本上就是Flower和Leek的工作方式。

最新更新