给定布局:
background
tasks
__init__.py
generic.py
helpers.py
__init__.py
_server.py
config.py
router.py
server.py
并与celery -A background._server worker
一起启动_server.py
尝试使用.delay(..)
调用 generic.adder
函数时,我在 Worker 中得到了一个KeyError: u'generic.adder'
加法器功能:
文件generic.py
from background.server import app
from background.tasks.helpers import standardized_task
@standardized_task(app, name='generic.adder')
def adder(x, y):
return x + y
..包装了一个函数,该函数采用app
实例并将 Celery Task 的输入/输出标准化为 JSON 对象,该对象返回结果和函数。(包括在下面)但是,问题是当这个包装函数与generic.adder位于同一文件中时,它可以完美地工作 - 当它被导入并如上所述使用时,它会抛出密钥错误。
我被引导相信包装器以某种方式修改了传递给app.task
的name=..
属性,该属性具有来自helpers.py
的函数名称,这导致从任务访问时找不到generic.adder
的文字名称。
同样重要的是要注意,如果你尝试从内部调用adder(..)
_server.py
(模块从芹菜 CLI 运行),它就可以完美地工作;只有当通过分布式接口调用时,才会抛出错误;这意味着,导入独立于 Celery 工作。
文件helpers.py
__author__ = 'Blake'
import types
JSON_TYPES = [
dict, list, unicode, str, int, long, float, bool, types.NoneType
]
def standardized_task(app, *args, **kwargs):
def wrapped_task(fn):
def wrapped_fn(*fnargs, **fnkwargs):
throws = fnkwargs.get('throws', Exception)
raises = fnkwargs.get('raises', False)
if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
raise ValueError('throws value not of type Exception: %s' % type(throws))
result, error = None, None
try:
result = fn(*fnargs, **fnkwargs)
if type(result) not in JSON_TYPES:
result = unicode(result)
except throws, e:
error = e
if raises:
raise
finally:
return {
'result': result,
'error': str(error) if error else None,
'meta': {
'args': fnargs, 'kwargs': fnkwargs
}
}
return app.task(wrapped_fn, *args, **kwargs)
return wrapped_task
文件_server.py
from background.server import app
from background.tasks.generic import *
答案不是使用装饰器,而是扩展芹菜。任务成抽象类并使用,@app.task(name='...', base=MyNewAbstractTask)
以下SO帖子更好地解释了它:
芹菜任务和自定义装饰器
import types
JSON_TYPES = [
dict, list, unicode, str, int, long, float, bool, types.NoneType
]
class StandardizedTask(Task):
abstract = True
def __call__(self, *args, **kwargs):
return self.inner_run(*args, **kwargs)
def inner_run(self, *args, **kwargs):
throws = kwargs.get('throws', Exception)
raises = kwargs.get('raises', False)
if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
raise ValueError('throws value not of type Exception: %s' % type(throws))
result, error = None, None
try:
result = self.run(*args, **kwargs)
if type(result) not in JSON_TYPES:
result = unicode(result)
except throws, e:
error = e
if raises:
raise
finally:
return {
'result': result,
'error': str(error) if error else None,
'meta': {
'args': args, 'kwargs': kwargs }}