我想将一个函数作为参数传递给一个芹菜任务。我在stackoverflow中发现了两个类似的问题(1和2(。我尝试了答案中提到的解决方案,这就是我目前正在做的:
内部呼叫模块:
import marshal
def function_to_be_passed:
# ...
serialized_func_code = marshal.dumps(function_to_be_passed.__code__)
celery_task.delay(serialized_func_code)
在芹菜任务中,我正在反序列化并调用函数:
import marshal, types
from celery.task import task
@task()
def celery_task(serialized_func_code):
code = marshal.loads(serialized_func_code)
func = types.FunctionType(code, globals(), "some_func_name")
# calling the deserialized function
func()
然而,在调用celery_task.delay(serialized_func_code)
时,我得到一个错误:
kombu.exceptions.EncodeError:"utf-8"编解码器无法解码中的字节0xe3位置0:无效的连续字节
共享下面的堆栈跟踪:
File “...caller.py",
celery_task.delay(
File "/python3.8/site-packages/celery/app/task.py", line 426, in delay
return self.apply_async(args, kwargs)
File "/python3.8/site-packages/celery/app/task.py", line 555, in apply_async
content_type, content_encoding, data = serialization.dumps(
File "/python3.8/site-packages/kombu/serialization.py", line 221, in dumps
payload = encoder(data)
File "/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/python3.8/site-packages/kombu/serialization.py", line 54, in _reraise_errors
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/python3.8/site-packages/vine/five.py", line 194, in reraise
raise value.with_traceback(tb)
File "/python3.8/site-packages/kombu/serialization.py", line 50, in _reraise_errors
yield
File "/python3.8/site-packages/kombu/serialization.py", line 221, in dumps
payload = encoder(data)
File "/python3.8/site-packages/kombu/utils/json.py", line 69, in dumps
return _dumps(s, cls=cls or _default_encoder,
File "/python3.8/site-packages/simplejson/__init__.py", line 398, in dumps
return cls(
File "/python3.8/site-packages/simplejson/encoder.py", line 296, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/python3.8/site-packages/simplejson/encoder.py", line 378, in iterencode
return _iterencode(o, 0)
kombu.exceptions.EncodeError: 'utf-8' codec can't decode byte 0xe3 in position 0: invalid continuation byte
当我使用pickle提供的dumps
和loads
时,也会出现类似的错误。
kombu.exceptions.EncodeError:"utf-8"编解码器无法解码中的字节0x80位置0:起始字节无效
我使用的是Django版本2.2.17和Python版本3.8.5。
在这里可以找到一个好的答案。总之,您可以将任务序列化程序更改为pickle,而不是这里提到的json。小心不要从外部资源中榨取任何东西,因为这可能很危险。