我正在尝试创建一些芹菜任务作为类,但是有一些困难。课程是:
class BaseCeleryTask(app.Task):
def is_complete(self):
""" default method for checking if celery task has completed. """
# simply return result (since by default tasks return boolean indicating completion)
try:
return self.result
except AttributeError:
logger.error('Result not defined. Make sure task has run!')
return False
class MacroReportTask(BaseCeleryTask):
def run(self, params):
""" Override the default run method with signal factory run"""
# hold on to the factory
process = MacroCountryReport(params)
self.result = process.run()
return self.result
但是,当我初始化应用程序并检查app.tasks(或运行工作者)时,应用似乎在其注册表中没有上述任务。其他基于功能的任务(using app.task() decorator
)似乎已注册罚款。
我将上述任务运行为:
process = SignalFactoryTask()
process.delay(params)
芹菜工人错误,并带有以下消息: Received unregistered task of type None
。
我认为我遇到的问题是:如何像常规的基于函数的任务一样,将自定义类添加到任务注册表中?
遇到了完全相同的问题,花了几个小时才能找到解决方案,因为我90%肯定这是一个错误。在您的课堂任务中,尝试以下
class BaseCeleryTask(app.Task):
def __init__(self):
self.name = "[modulename].BaseCeleryTask"
class MacroReportTask(app.Task):
def __init__(self):
self.name = "[modulename].MacroReportTask"
似乎在应用程序上注册它仍然有一个错误,该错误未自动配置名称。让我知道这是否有效。