芹菜:基于自定义基础/子类的任务未在app.tasks下显示



我正在尝试创建一些芹菜任务作为类,但是有一些困难。课程是:

class BaseCeleryTask(app.Task):
def is_complete(self):
    """ default method for checking if celery task has completed. """
    # simply return result (since by default tasks return boolean indicating completion)
    try:
        return self.result
    except AttributeError:
        logger.error('Result not defined. Make sure task has run!')
        return False


class MacroReportTask(BaseCeleryTask):
def run(self, params):
    """ Override the default run method with signal factory run"""
    # hold on to the factory
    process = MacroCountryReport(params)
    self.result = process.run()
    return self.result

但是,当我初始化应用程序并检查app.tasks(或运行工作者)时,应用似乎在其注册表中没有上述任务。其他基于功能的任务(using app.task() decorator)似乎已注册罚款。

我将上述任务运行为:

process = SignalFactoryTask()
process.delay(params)

芹菜工人错误,并带有以下消息: Received unregistered task of type None

我认为我遇到的问题是:如何像常规的基于函数的任务一样,将自定义类添加到任务注册表中?

遇到了完全相同的问题,花了几个小时才能找到解决方案,因为我90%肯定这是一个错误。在您的课堂任务中,尝试以下

class BaseCeleryTask(app.Task):
    def __init__(self):
        self.name = "[modulename].BaseCeleryTask"
class MacroReportTask(app.Task):
    def __init__(self):
        self.name = "[modulename].MacroReportTask"

似乎在应用程序上注册它仍然有一个错误,该错误未自动配置名称。让我知道这是否有效。

相关内容

  • 没有找到相关文章

最新更新