我正在使用芹菜4.0.2版本。
与以前版本的 Celery 相比,基于类的任务似乎不会自动注册(即,如果您配置了自动发现(。
但是,我什至无法手动注册基于类的任务。
根据芹菜更改日志:
http://docs.celeryproject.org/en/latest/changelog.html#version-4-0-1
从版本 4.0.1 开始,应该可以手动注册任务:
from celery import Celery, Task
app = Celery()
class CustomTask(Task):
def run(self):
return 'hello'
app.register_task(CustomTask())
但这似乎行不通。有谁知道如何实现这一目标?
我尝试了一些正在讨论的建议(除了集成 https://github.com/celery/celery/issues/3744 中提到的自定义任务加载器(:
注册基于芹菜类的任务
https://github.com/celery/celery/issues/3615
https://github.com/celery/celery/issues/3744
快到了!您需要对注册的任务调用delay()
。
这将起作用:
from celery import Celery, Task
app = Celery()
class CustomTask(Task):
def run(self):
return 'hello'
task = CustomTask()
app.register_task(task)
task.delay()
如果您需要shared_task
装饰器:
from celery import Task, shared_task
class CustomTask(Task):
def process(self):
return 'hello'
@shared_task(bind=True, base=CustomTask)
def custom(self):
self.process()
process
是启动任务的自定义名称(修饰器覆盖run
方法(
bind=True
将函数绑定到类实例
base=CustomTask
为任务设置基类