更改默认的多进程unpickler类



我在设备a上有一个多处理程序,它使用队列和SyncManager使其通过网络可访问。队列在设备上存储来自模块的自定义类,该类由多处理包自动pickle为module.class。

在通过SyncManager读取队列的另一个设备上,我有相同的模块作为包的一部分,而不是像在设备a上那样的顶层。这意味着当我试图从队列中读取一个项目时,我得到一个ModuleNotFoundError,因为unpickler不知道模块现在是package.module。

我看到过这样的解决方法,它使用了一个基于pickler的新类。Unpicker和似乎最不容易和可扩展:https://stackoverflow.com/a/53327348/5683049但是,我不知道如何指定要使用的多进程解pickle类。

我看到这可以为reducer类做,所以我假设有一种方法也可以设置unpickler?

我从来没有见过这样做的方法。你可能得破解一下这个。让多处理器系统认为您正在传递字节字符串或字节数组,并让您的用户代码执行pickle和unpickle。

黑客吗?是的。但也不会比你已经要做的更糟。

使用:

  • 如何更改多处理模块使用的序列化方法?
  • https://stackoverflow.com/a/53327348/5683049

我能够使用类似于下面的代码使其工作:

from multiprocessing.reduction import ForkingPickler, AbstractReducer
import pickle
import io
multiprocessing.context._default_context.reducer = MyPickleReducer()
class RenameUnpickler(pickle.Unpickler):
    def find_class(self, module, name):
        renamed_module = module
        if module == "old_module_name":
            renamed_module = "new_package.module_name"
        return super(RenameUnpickler, self).find_class(renamed_module, name)
class MyForkingPickler(ForkingPickler):
    # Method signature from pickle._loads       
    def loads(self, /, *, fix_imports=True, encoding="ASCII", errors="strict",
            buffers=None):
        if isinstance(s, str):
            raise TypeError("Can't load pickle from unicode string")
        file = io.BytesIO(s)
        return RenameUnpickler(file, fix_imports=fix_imports, buffers=buffers,
                        encoding=encoding, errors=errors).load()
    
class MyPickleReducer(AbstractReducer):
    ForkingPickler = MyForkingPickler
    register = MyForkingPickler.register

如果您想进一步覆盖解pickle的执行方式,这可能很有用,但在我最初的情况下,使用

重定向模块可能更容易:
from new_package import module_name
sys.modules['old_module_name'] = module_name

相关内容

  • 没有找到相关文章

最新更新