我正在努力让芹菜工作。总的来说,我是Python的新手,显然是Celery的新手,我正在尝试获得基本的示例工作。我想运行一个后台任务,只要它处于活动状态,它就应该保留它的状态。因此,我尝试实现从客户端脚本调用时递增整数变量的基本示例。我正在研究具有树莓图像的树莓派。这是我的工作人员任务代码:
from celery import Task, registry, Celery
import celery
celery = Celery('tasks', broker='redis://localhost:6379',backend='redis://localhost:6379')
class MyTask(celery.Task):
a = 0
def __init__(self):
self.a = 0
def increment(self, x):
self.a += x
return self.a
@property
def a(self):
return a
@celery.task(Base=MyTask)
def mytask(x):
mytask.increment(x)
return mytask.a
这是调用脚本:
from tasks import mytask
result = mytask.delay(2)
print result.get(timeout=1)
这会产生以下错误:
[2018-05-21 15:08:23,889: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7610cdf0> (args:('tasks.mytask', '3ae83225-faee-4dc0-a6c1-1a7e7e5eaca2', {'origin': 'gen27381@raspberrypi', 'lang': 'py', 'task': 'tasks.mytask', 'group': None, 'root_id': '3ae83225-faee-4dc0-a6c1-1a7e7e5eaca2', u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': 'celery', u'exchange': u''}, 'expires': None, u'correlation_id': '3ae83225-faee-4dc0-a6c1-1a7e7e5eaca2', 'retries': 0, 'timelimit': [None, None], 'argsrepr': '(2,)', 'eta': None, 'parent_id': None, u'reply_to': 'fdebe38f-7353-3ef2-920b-74f76a294ff7', 'id': '3ae83225-faee-4dc0-a6c1-1a7e7e5eaca2', 'kwargsrepr': '{}'}, '[[2], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]', 'application/json', 'utf-8') kwargs:{})
[2018-05-21 15:08:23,894: DEBUG/MainProcess] Task accepted: tasks2.mytask[3ae83225-faee-4dc0-a6c1-1a7e7e5eaca2] pid:27366
Traceback (most recent call last):
File "test2.py", line 4, in <module>
print result.get(timeout=1)
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 194, in get
on_message=on_message,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/async.py", line 191, in wait_for_pending
return result.maybe_throw(callback=callback, propagate=propagate)
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 299, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 292, in throw
self.on_ready.throw(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/vine/promises.py", line 217, in throw
reraise(type(exc), exc, tb)
File "<string>", line 1, in reraise
celery.backends.base.AttributeError: 'mytask' object has no attribute 'increment'
[2018-05-21 15:08:23,910: ERROR/ForkPoolWorker-2] Task tasks.mytask[3ae83225-faee-4dc0-a6c1-1a7e7e5eaca2] raised unexpected: AttributeError("'mytask' object has no attribute 'increment'",)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 374, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 629, in __protected_call__
return self.run(*args, **kwargs)
File "/home/pi/example/tasks.py", line 20, in mytask
mytask.increment(x)
File "/usr/local/lib/python2.7/dist-packages/celery/local.py", line 146, in __getattr__
return getattr(self._get_current_object(), name)
AttributeError: 'mytask' object has no attribute 'increment'
使用命令添加工作线程:
芹菜 -A 任务工作者 --loglevel=debug
和测试脚本简单地使用
蟒蛇 test.py
我还尝试直接访问"a"变量,结果出现了相同的错误。很明显,我做错了一些非常基本的事情,但我无法弄清楚是什么。
好的,所以问题确实非常基本。用于定义自定义任务类的语法不是 ChildClass(Base=ParentClass(,而是 ChildClass(base=ParentClass(...我一定是从某个例子中复制的,口译员从来没有想过要抱怨它中的任何一个。 问题中的示例还存在其他几个问题 以下是实际执行其应执行的操作的代码:
from celery import Task, registry, Celery
import celery
class MyTask(celery.Task):
a = 0
def increment(self, x):
self.a += x
return self.a
celery = Celery('tasks', broker='redis://localhost:6379',backend='redis://localhost:6379')
@celery.task(base=MyTask)
def fookyou(x):
val = fookyou.increment(x)
return val
请注意,我还修改了任务名称,以防当唯一的区别是大写字母时,它与自定义类名称混合。显然,要求口译员发现这种类型的错误太过分了......