我想为芹菜中的所有任务创建基类,我的代码是
任务//test.py
from celery import app
from base.main import CeleryMain
@app.task(time_limit=10)
def test():
task = CeleryMain.delay()
return task
基础/main.py
from celery import app
import requests
from celery import Celery, Task
class CeleryMain(app.Task):
abstract = True
def run(self, task):
data = task.apply_async()
s = data.get(timeout=10, interval=0.01)
return {'success': True, 'data': s}
task = CeleryMain()
app.register_task(task)
task.delay()
celery.py
imports = (
'tasks.all.test',
)
I have error:
celery.exceptions.NotRegistered: 'tasks.all.test'
请帮助我如何正确创建主类的所有任务没有错误的每个任务基类。
CeleryMain= app.register_task(CeleryMain())
task = CeleryMain.run(task)打印(task.id)