我想覆盖Celery的任务类。我可以覆盖on_success和on_failure方法,但运行方法对我来说并不容易。我尝试使用绑定方法。我的代码如下:
class MyTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print("success")
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("failed")
def bind(self, app):
return super(self.__class__, self).bind(app)
def run(self, *args, **kwargs):
x = kwargs.get('data', None)
x = x**2
if __name__=="__main__":
mytask = MyTask()
app = Celery('mytask', backend='redis', broker='redis://localhost')
mytask.bind(app)
job = mytask.apply_async(data = 1)
但是当我运行代码时,出现以下错误:
Received unregistered task of type None.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
File "/home/ayandeh/anaconda3/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
strategy = strategies[type_]
KeyError: None
我搜索了很多,但没有得到结果。我应该如何注册任务?
不需要显式绑定应用程序,因为芹菜任务将在调用apply_async时自动绑定到current_app。如果要显式绑定,可以使用两种方法:1. 任务绑定(应用程序(2. 从应用程序创建任务类。任务。通过继承应用程序。任务,此应用程序将被绑定。
您的问题与绑定无关。这是关于任务注册表的。更正如下:
from celery import Celery, Task
class MyTask(Task):
name = 'mytask'
def on_success(self, retval, task_id, args, kwargs):
print("success")
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("failed")
def run(self, *args, **kwargs):
x = kwargs.get('data', None)
print(x**2)
if __name__ == "__main__":
app = Celery('mytask', backend='redis', broker='redis://localhost')
MyTask.bind(app)
app.tasks.register(MyTask)
app.worker_main()
希望对您有所帮助。