注册基于芹菜类的任务



Python 3.x, Celery 4.x...

我有一个基于类的任务。

myproj/celery.py

from celery import Celery
# django settings stuff...
app = Celery('myproj')
app.autodiscover_tasks()

app1/tasks.py

import celery
class EmailTask(celery.Task):
    def run(self, *args, **kwargs):
        self.do_something()

如果我这样做:

$ celery worker -A myproj -l info
[tasks]
  . app2.tasks.debug_task
  . app2.tasks.test

因此,芹菜装饰器工作以注册任务,但未注册基于类的任务。

如何注册基于类的任务?

更新 1:

如果我将以下行添加到app1/tasks.py

from myproj.celery import app
email_task = app.tasks[EmailTask.name]

.

$ celery worker -A myproj -l info
  File "myproj/app1/tasks.py", line 405, in <module>
    email_task = app.tasks[EmailTask.name]
  File "/usr/local/lib/python3.5/site-packages/celery/app/registry.py", line 19, in __missing__
    raise self.NotRegistered(key)
celery.exceptions.NotRegistered

更新 2:

能够通过包装器同步(run)执行我的任务。但是,我无法异步运行任务,即通过delay.

app1/tasks.py

@app.task
def email_task():
    """
    Wrapper to call class based task
    """
    task = EmailTask()
    # task.delay()  # Won't work!!!
    task.run()

.

$./manage.py shell
> from app1.tasks import EmailTask
> task1 = EmailTask()
> task1.run() # a-okay
> task2 = EmailTask()
> task2.delay() # nope
  <AsyncResult: 1c03bad9-169a-4a4e-a56f-7d83892e8bbc>
# And on the worker...
[2017-01-22 08:07:28,120: INFO/PoolWorker-1] Task app1.tasks.email_task[41e5bc7d-058a-400e-9f73-c853c0f60a2a] succeeded in 0.0701281649817247s: None
[2017-01-22 08:10:31,909: ERROR/MainProcess] Received unregistered task of type None.
The message has been ignored and discarded.

你可以在这里找到完整的描述,但对我来说,添加

from myapp.celery import app
app.tasks.register(MyTaskTask())

有了celery==4.2.1,我必须使用 Celery.register_task() 的返回值作为调用delay()的任务实例:

# my_app/tasks.py
import celery
from my_app.celery import app
class MyTask(celery.Task):
    def run(self):
        [...]
MyTask = app.register_task(MyTask())

然后使用它:

# my_app/app.py
from my_app.tasks import MyTask
[...]
MyTask.delay()

该解决方案已在 Github 问题中描述,并在此处进行了记录。

呵,dtk

我只是花了很长时间弄清楚为什么我的工人在启动它时崩溃了。

如果您正在接收:

AttributeError: 'NoneType' object has no attribute 'push'当您启动工作线程时

请勿使用
app.tasks.register(MyTask())

而是使用:
app.register_task(MyTask())

在此处查看更多内容: https://github.com/celery/celery/issues/7173

注意 - 这是在芹菜 5.3.0 上完成

相关内容

  • 没有找到相关文章

最新更新