在中注册基于类的任务



我正在使用芹菜4.0.2版本。

与以前版本的 Celery 相比,基于类的任务似乎不会自动注册(即,如果您配置了自动发现(。

但是,我什至无法手动注册基于类的任务。

根据芹菜更改日志:

http://docs.celeryproject.org/en/latest/changelog.html#version-4-0-1

从版本 4.0.1 开始,应该可以手动注册任务:

from celery import Celery, Task
app = Celery()
class CustomTask(Task):
def run(self):
return 'hello'
app.register_task(CustomTask())

但这似乎行不通。有谁知道如何实现这一目标?

我尝试了一些正在讨论的建议(除了集成 https://github.com/celery/celery/issues/3744 中提到的自定义任务加载器(:

注册基于芹菜类的任务

https://github.com/celery/celery/issues/3615

https://github.com/celery/celery/issues/3744

快到了!您需要对注册的任务调用delay()

这将起作用:

from celery import Celery, Task
app = Celery()

class CustomTask(Task):
def run(self):
return 'hello'

task = CustomTask()
app.register_task(task)
task.delay()

如果您需要shared_task装饰器:

from celery import Task, shared_task
class CustomTask(Task):
def process(self):
return 'hello'
@shared_task(bind=True, base=CustomTask)
def custom(self):
self.process()

process是启动任务的自定义名称(修饰器覆盖run方法(

bind=True将函数绑定到类实例

base=CustomTask为任务设置基类

相关内容

  • 没有找到相关文章

最新更新