我在设备a上有一个多处理程序,它使用队列和SyncManager使其通过网络可访问。队列在设备上存储来自模块的自定义类,该类由多处理包自动pickle为module.class。
在通过SyncManager读取队列的另一个设备上,我有相同的模块作为包的一部分,而不是像在设备a上那样的顶层。这意味着当我试图从队列中读取一个项目时,我得到一个ModuleNotFoundError,因为unpickler不知道模块现在是package.module。
我看到过这样的解决方法,它使用了一个基于pickler的新类。Unpicker和似乎最不容易和可扩展:https://stackoverflow.com/a/53327348/5683049但是,我不知道如何指定要使用的多进程解pickle类。
我看到这可以为reducer类做,所以我假设有一种方法也可以设置unpickler?
我从来没有见过这样做的方法。你可能得破解一下这个。让多处理器系统认为您正在传递字节字符串或字节数组,并让您的用户代码执行pickle和unpickle。
黑客吗?是的。但也不会比你已经要做的更糟。
使用:
- 如何更改多处理模块使用的序列化方法?
- https://stackoverflow.com/a/53327348/5683049
我能够使用类似于下面的代码使其工作:
from multiprocessing.reduction import ForkingPickler, AbstractReducer
import pickle
import io
multiprocessing.context._default_context.reducer = MyPickleReducer()
class RenameUnpickler(pickle.Unpickler):
def find_class(self, module, name):
renamed_module = module
if module == "old_module_name":
renamed_module = "new_package.module_name"
return super(RenameUnpickler, self).find_class(renamed_module, name)
class MyForkingPickler(ForkingPickler):
# Method signature from pickle._loads
def loads(self, /, *, fix_imports=True, encoding="ASCII", errors="strict",
buffers=None):
if isinstance(s, str):
raise TypeError("Can't load pickle from unicode string")
file = io.BytesIO(s)
return RenameUnpickler(file, fix_imports=fix_imports, buffers=buffers,
encoding=encoding, errors=errors).load()
class MyPickleReducer(AbstractReducer):
ForkingPickler = MyForkingPickler
register = MyForkingPickler.register
如果您想进一步覆盖解pickle的执行方式,这可能很有用,但在我最初的情况下,使用
重定向模块可能更容易:from new_package import module_name
sys.modules['old_module_name'] = module_name