使用导入的装饰器包装应用程序任务功能时出现芹菜键错误;仅错误与导入



给定布局:

background 
    tasks  
        __init__.py
        generic.py
        helpers.py
    __init__.py
    _server.py
    config.py
    router.py
    server.py

并与celery -A background._server worker一起启动_server.py

尝试使用.delay(..)调用 generic.adder 函数时,我在 Worker 中得到了一个KeyError: u'generic.adder'

加法器功能:

文件generic.py

from background.server import app
from background.tasks.helpers import standardized_task
@standardized_task(app, name='generic.adder')
def adder(x, y):
    return x + y

..包装了一个函数,该函数采用app实例并将 Celery Task 的输入/输出标准化为 JSON 对象,该对象返回结果和函数。(包括在下面)但是,问题是当这个包装函数与generic.adder位于同一文件中时,它可以完美地工作 - 当它被导入并如上所述使用时,它会抛出密钥错误

我被引导相信包装器以某种方式修改了传递给app.taskname=..属性,该属性具有来自helpers.py的函数名称,这导致从任务访问时找不到generic.adder的文字名称。

同样重要的是要注意,如果你尝试从内部调用adder(..) _server.py(模块从芹菜 CLI 运行),它就可以完美地工作;只有当通过分布式接口调用时,才会抛出错误;这意味着,导入独立于 Celery 工作。

文件helpers.py

__author__ = 'Blake'
import types
JSON_TYPES = [
    dict, list, unicode, str, int, long, float, bool, types.NoneType
]
def standardized_task(app, *args, **kwargs):
    def wrapped_task(fn):
        def wrapped_fn(*fnargs, **fnkwargs):
            throws = fnkwargs.get('throws', Exception)
            raises = fnkwargs.get('raises', False)
            if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
                raise ValueError('throws value not of type Exception: %s' % type(throws))
            result, error = None, None
            try:
                result = fn(*fnargs, **fnkwargs)
                if type(result) not in JSON_TYPES:
                    result = unicode(result)
            except throws, e:
                error = e
                if raises:
                    raise
            finally:
                return {
                    'result': result,
                    'error': str(error) if error else None,
                    'meta': {
                        'args': fnargs, 'kwargs': fnkwargs
                    }
                }
        return app.task(wrapped_fn, *args, **kwargs)
    return wrapped_task

文件_server.py

from background.server import app
from background.tasks.generic import *

答案不是使用装饰器,而是扩展芹菜。任务成抽象类并使用,@app.task(name='...', base=MyNewAbstractTask)

以下SO帖子更好地解释了它:

芹菜任务和自定义装饰器

import types
JSON_TYPES = [
    dict, list, unicode, str, int, long, float, bool, types.NoneType
]
class StandardizedTask(Task):
    abstract = True
    def __call__(self, *args, **kwargs):
        return self.inner_run(*args, **kwargs)
    def inner_run(self, *args, **kwargs):
        throws = kwargs.get('throws', Exception)
        raises = kwargs.get('raises', False)
        if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
            raise ValueError('throws value not of type Exception: %s' % type(throws))
        result, error = None, None
        try:
            result = self.run(*args, **kwargs)
            if type(result) not in JSON_TYPES:
                result = unicode(result)
        except throws, e:
            error = e
            if raises:
                raise
        finally:
            return {
                'result': result,
                'error': str(error) if error else None,
                'meta': {
                    'args': args, 'kwargs': kwargs }}

相关内容

  • 没有找到相关文章

最新更新