app芹菜任务依赖于芹菜中的BaseClass



我想为芹菜中的所有任务创建基类,我的代码是

任务//test.py

from celery import app
from base.main import CeleryMain

@app.task(time_limit=10)
def test():
task = CeleryMain.delay()
return task

基础/main.py

from celery import app
import requests
from celery import Celery, Task

class CeleryMain(app.Task):

abstract = True

def run(self, task):
data = task.apply_async()
s = data.get(timeout=10, interval=0.01)
return {'success': True, 'data': s}

task = CeleryMain()
app.register_task(task)
task.delay()

celery.py

imports = (
'tasks.all.test',
)

I have error:

celery.exceptions.NotRegistered: 'tasks.all.test'

请帮助我如何正确创建主类的所有任务没有错误的每个任务基类。

CeleryMain= app.register_task(CeleryMain())

task = CeleryMain.run(task)打印(task.id)

相关内容

  • 没有找到相关文章

最新更新