我在这里有点bug,试图解决这个问题。在这个过程中,我试图创建一个自包含的函数来重现这个问题,但出于某种原因,它在微观示例中如预期那样工作,但在我的产品代码中却没有。
我有一个pathlib的子类。路径:
class WalkPath(Path):
_flavour = type(Path())._flavour
def __init__(self, *args, origin: 'WalkPath'=None, dirs: []=None, files: []=None):
super().__init__()
if type(args[0]) is str:
self.origin = origin or self
else:
self.origin = origin or args[0].origin
self._dirs: [WalkPath] = list(map(WalkPath, dirs)) if dirs else None
self._files: [WalkPath] = list(map(WalkPath, files)) if files else None
self._lazy_attr = None
@staticmethod
def sync(wp: Union[str, Path, 'WalkPath']):
"""Syncronize lazy-loaded attributes"""
x = wp.lazy_attr
return wp
@property
def lazy_attr(self):
if self._lazy_attr:
return self._lazy_attr:
# long running op
self._lazy_attr = long_running_op(self)
return self._lazy_attr
class Find:
@staticmethod
def shallow(path: Union[str, Path, 'WalkPath'],
sort_key=lambda p: str(p).lower(),
hide_sys_files=True) -> Iterable['WalkPath']:
origin = WalkPath(path)
if origin.is_file():
return [origin]
for p in sorted(origin.iterdir(), key=sort_key):
if hide_sys_files and is_sys_file(p):
continue
yield WalkPath(p, origin=origin)
使用multiprocessing.Pool
,我想在池中执行那个长时间运行的进程。
看起来是这样的:
_paths = ['/path1', '/path2']
found = list(itertools.chain.from_iterable(Find.shallow(p) for p in _paths))
Find.shallow(见上文(基本上只对原点执行Path.iterdir
,然后将结果映射到WalkPath
对象,将原点设置为调用的路径。我知道这是有效的,因为它输出正确:
for x in found:
print(x.origin, x.name)
然后我们调度到一个池:
with mp.Pool() as pool:
done = [x for x in pool.map(WalkPath.sync, found) if x.origin]
但这失败了,启动了'WalkPath' has no attribute 'origin'
。
这是我在本地复制它的尝试,但出于某种原因,它起作用了!我看不出有什么不同。
#!/usr/bin/env python
import multiprocessing as mp
import time
from itertools import tee, chain
r = None
class P:
def __init__(self, i, static=None):
# self.static = static if not static is None else i
self.static = static or i
# print(static, self.static)
self.i = i
self._a_thing = None
@property
def a_thing(self):
if self._a_thing:
print('Already have thing', self.i, 'static:', self.static)
return self._a_thing
time.sleep(0.05)
print('Did thing', self.i, 'static:', self.static)
self._a_thing = True
return self._a_thing
@staticmethod
def sync(x):
x.a_thing
x.another = 'done'
return x if x.a_thing else None
class Load:
@classmethod
def go(cls):
global r
if r:
return r
paths = [iter(P(i, static='0') for i in range(10)),
iter(P(i, static='0') for i in range(11, 20)),
iter(P(i, static='0') for i in range(21, 30))]
iternums, testnums = tee(chain.from_iterable(paths))
for t in testnums:
print('Want thing', t.i, 'to have static:', t.static)
with mp.Pool() as pool:
rex = [x for x in pool.map(P.sync, list(iternums)) if x.another]
r = rex
for done in rex:
print(done.i, done.static, done.a_thing, done.another)
Load.go()
问题的关键是Path
对象不能在解释器进程之间共享。
相反,当使用multiprocessing
时,Python序列化(pickle
s(所有参数,并从子流程返回值。
似乎pathlib.Path
定义了与origin
属性不兼容的自定义pickle/unpickling逻辑:
import pathlib
import pickle
class WalkPath(pathlib.Path):
_flavour = type(pathlib.Path())._flavour
def __init__(self, *args, origin: 'WalkPath'=None, dirs: []=None, files: []=None):
super().__init__()
if type(args[0]) is str:
self.origin = origin or self
else:
self.origin = origin or args[0].origin
self._dirs: [WalkPath] = list(map(WalkPath, dirs)) if dirs else None
self._files: [WalkPath] = list(map(WalkPath, files)) if files else None
self._lazy_attr = None
path = WalkPath('/tmp', origin='far away')
print(vars(path))
reloaded = pickle.loads(pickle.dumps(path))
print(vars(reloaded))
$ python3.9 test.py
{'origin': 'far away', '_dirs': None, '_files': None, '_lazy_attr': None}
{'origin': WalkPath('/tmp'), '_dirs': None, '_files': None, '_lazy_attr': None}
为了好玩,下面是我如何解决这个问题的。
这里发生的是,Path
实现了__reduce__
函数,它在__getstate__
或__setstate__
(它们是更高级别的酸洗函数(之前被调用。
这是PurePath
的__reduce__
函数,Path
的基类:
def __reduce__(self):
# Using the parts tuple helps share interned path parts
# when pickling related paths.
return (self.__class__, tuple(self._parts))
哦,不!好吧,我们可以看到会发生什么——这是故意设计的,只是为了传递它的一个部分的元组,完全放弃状态,形成自己的新版本。
我不想搞砸,但我也想确保我的状态在这里得到保护。因此,我创建了一个序列化程序,将这些属性作为元组参数(因为…__reduce__
出于某种荒谬的原因只将单个元组作为参数(。
我还必须确保origin
现在是Path
对象,而不是WalkPath
对象,否则我将以无休止的递归结束。我在__init__
:中添加了一些强制和安全类型
if origin:
self.origin = Path(origin)
elif len(args) > 0:
try:
self.origin = Path(args[0].origin) or Path(args[0])
except:
self.origin = Path(self)
if not self.origin:
raise AttributeError(f"Could not infer 'origin' property when initializing 'WalkPath', for path '{args[0]}'")
然后我将这两种方法添加到WalkPath
:中
# @overrides(__reduce__)
def __reduce__(self):
# From super()
# Using the parts tuple helps share internal path parts
# when pickling related paths.
# return (self.__class__, tuple(self._parts))
# This override passes its parts to a Path object (which
# natively pickles), then serializes and applies
# its remaining attributes.
args = {**{'_parts': self._parts}, **self.__dict__}
return (self.__class__._from_kwargs, tuple(args.items()))
@classmethod
def _from_kwargs(cls, *args):
kwargs = dict(args)
new = cls(super().__new__(cls,
*kwargs['_parts']),
origin=kwargs['origin'])
new.__dict__ = {**new.__dict__, **kwargs}
return new