将客户端函数传递给芹菜任务



我想将客户端创建的函数传递给芹菜任务并执行它。例如,我正在尝试编写一个map函数,该函数接受一个函数f和一个列表l并在芹菜任务中执行map(f, l)

据推测,该函数没有正确序列化(可以理解,这很难)。但是,有什么办法可以做到这一点吗?最佳实践是什么?我想我可以传递一个字符串,然后exec它,但我宁愿不仅仅是我的应用程序的工作方式。

编辑:我找到了一种序列化函数的方法...我想我可以把它总结起来做我需要做的事情。有什么更好的主意吗?


from celery import Celery
app = Celery('tasks', broker='redis://localhost', backend='redis://localhost')
@app.task
def cp_map(f, l):
    return map(f, l)

然后,我尝试将此任务用于此:

In [20]: from tasks import cp_map
In [21]: def f(x): return x + 1
In [22]: cp_map.delay(f, [1,2,3])
Out[22]: <AsyncResult: 27baf8bf-8ef3-496d-a445-ebd7ee94e206>
In [23]: _.status
Out[23]: 'PENDING'

在工人身上,我得到这个:

[2014-02-09 22:27:00,828: CRITICAL/MainProcess] Can't decode message body: DecodeError(AttributeError("'module' object has no attribute 'f'",),) (type:u'application/x-python-serialize' encoding:u'binary' raw:"'\x80\x02}q\x01(U\x07expiresq\x02NU\x03utcq\x03\x88U\x04argsq\x04c__main__\nf\nq\x05]q\x06(K\x01K\x02K\x03e\x86q\x07U\x05chordq\x08NU\tcallbacksq\tNU\x08errbacksq\nNU\x07tasksetq\x0bNU\x02idq\x0cU$27baf8bf-8ef3-496d-a445-ebd7ee94e206q\rU\x07retriesq\x0eK\x00U\x04taskq\x0fU\x0ctasks.cp_mapq\x10U\ttimelimitq\x11NN\x86U\x03etaq\x12NU\x06kwargsq\x13}q\x14u.' (233b)"')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 585, in _receive_callback
    decoded = None if on_m else message.decode()
  File "/usr/local/lib/python2.7/dist-packages/kombu/message.py", line 142, in decode
    self.content_encoding, accept=self.accept)
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 184, in loads
    return decode(data)
  File "/usr/lib/python2.7/contextlib.py", line 35, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 59, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 55, in _reraise_errors
    yield
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 184, in loads
    return decode(data)
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 64, in pickle_loads
    return load(BytesIO(s))
DecodeError: 'module' object has no attribute 'f'

可以使用 marshal 将函数序列化为字符串,然后在任务中反序列化它。我不知道这是否是最好的方法,但它会起作用。你可能还想看看莳萝。

下面是从另一个堆栈溢出答案复制的一些示例代码:

import marshal
def foo(x): return x*x
code_string = marshal.dumps(foo.func_code)

然后在任务中:

import marshal, types
code = marshal.loads(code_string)
func = types.FunctionType(code, globals(), "some_func_name")
func(10)  # gives 100

有时您可能会遇到编码复杂函数的问题。您可以在编码过程中忽略或替换错误。

__ENCODING_FORMAT = 'ISO-8859-1'  # ISO-8859-1 is a work-around to avoid UnicodeDecodeError on 'byte 0x83'
def _serialize_func(func):
    """ converts func into code string and encodes it with ISO-8859-1 """
    return unicode(marshal.dumps(func.func_code), encoding=__ENCODING_FORMAT, errors='replace')

def _load_func(code_string):
    """ loads func from code string, decodes from unicode to str"""
    code = marshal.loads(code_string.encode(__ENCODING_FORMAT))
    return types.FunctionType(code, globals())

如果您引用的函数位于带有 __main__ 指令的文件中,也会出现这种类型的错误,即: 包含函数定义的文件如下所示:

def f(*args):
    ... some code here ...
if __name__ == "__main__":
    ... some code here ...

如果是这种情况,将函数定义放在与"__main__"引用代码分开的文件中应该可以解决问题。

假设这个简单的重构确实适用于你的用例,它比上面的元帅柔术简单得多。

相关内容

  • 没有找到相关文章

最新更新