多处理.在对pathlib中具有现有属性的对象调用函数时,池将失去状态.路径子类



我在这里有点bug,试图解决这个问题。在这个过程中,我试图创建一个自包含的函数来重现这个问题,但出于某种原因,它在微观示例中如预期那样工作,但在我的产品代码中却没有。

我有一个pathlib的子类。路径:

class WalkPath(Path):
_flavour = type(Path())._flavour

def __init__(self, *args, origin: 'WalkPath'=None, dirs: []=None, files: []=None):
super().__init__()

if type(args[0]) is str:
self.origin = origin or self
else:
self.origin = origin or args[0].origin
self._dirs: [WalkPath] = list(map(WalkPath, dirs)) if dirs else None
self._files: [WalkPath] = list(map(WalkPath, files)) if files else None
self._lazy_attr = None
@staticmethod
def sync(wp: Union[str, Path, 'WalkPath']):
"""Syncronize lazy-loaded attributes"""
x = wp.lazy_attr
return wp
@property
def lazy_attr(self):
if self._lazy_attr:
return self._lazy_attr:
# long running op
self._lazy_attr = long_running_op(self)
return self._lazy_attr
class Find:
@staticmethod
def shallow(path: Union[str, Path, 'WalkPath'],
sort_key=lambda p: str(p).lower(),
hide_sys_files=True) -> Iterable['WalkPath']:
origin = WalkPath(path)
if origin.is_file(): 
return [origin]

for p in sorted(origin.iterdir(), key=sort_key):
if hide_sys_files and is_sys_file(p):
continue
yield WalkPath(p, origin=origin)

使用multiprocessing.Pool,我想在池中执行那个长时间运行的进程。

看起来是这样的:

_paths = ['/path1', '/path2']
found = list(itertools.chain.from_iterable(Find.shallow(p) for p in _paths))

Find.shallow(见上文(基本上只对原点执行Path.iterdir,然后将结果映射到WalkPath对象,将原点设置为调用的路径。我知道这是有效的,因为它输出正确:

for x in found:
print(x.origin, x.name)

然后我们调度到一个池:

with mp.Pool() as pool:
done = [x for x in pool.map(WalkPath.sync, found) if x.origin]

但这失败了,启动了'WalkPath' has no attribute 'origin'

这是我在本地复制它的尝试,但出于某种原因,它起作用了!我看不出有什么不同。

#!/usr/bin/env python
import multiprocessing as mp
import time
from itertools import tee, chain
r = None
class P:

def __init__(self, i, static=None):
# self.static = static if not static is None else i
self.static = static or i
# print(static, self.static)
self.i = i

self._a_thing = None

@property
def a_thing(self):
if self._a_thing:
print('Already have thing', self.i, 'static:', self.static)
return self._a_thing
time.sleep(0.05)
print('Did thing', self.i, 'static:', self.static)
self._a_thing = True
return self._a_thing

@staticmethod
def sync(x):
x.a_thing
x.another = 'done'
return x if x.a_thing else None

class Load:

@classmethod
def go(cls):

global r

if r:
return r

paths = [iter(P(i, static='0') for i in range(10)),
iter(P(i, static='0') for i in range(11, 20)),
iter(P(i, static='0') for i in range(21, 30))]

iternums, testnums = tee(chain.from_iterable(paths))

for t in testnums:
print('Want thing', t.i, 'to have static:', t.static)

with mp.Pool() as pool:
rex = [x for x in pool.map(P.sync, list(iternums)) if x.another]

r = rex

for done in rex:
print(done.i, done.static, done.a_thing, done.another)
Load.go()

问题的关键是Path对象不能在解释器进程之间共享。

相反,当使用multiprocessing时,Python序列化(pickles(所有参数,并从子流程返回值。

似乎pathlib.Path定义了与origin属性不兼容的自定义pickle/unpickling逻辑:

import pathlib
import pickle

class WalkPath(pathlib.Path):
_flavour = type(pathlib.Path())._flavour
def __init__(self, *args, origin: 'WalkPath'=None, dirs: []=None, files: []=None):
super().__init__()
if type(args[0]) is str:
self.origin = origin or self
else:
self.origin = origin or args[0].origin
self._dirs: [WalkPath] = list(map(WalkPath, dirs)) if dirs else None
self._files: [WalkPath] = list(map(WalkPath, files)) if files else None
self._lazy_attr = None

path = WalkPath('/tmp', origin='far away')
print(vars(path))
reloaded = pickle.loads(pickle.dumps(path))
print(vars(reloaded))
$ python3.9 test.py 
{'origin': 'far away', '_dirs': None, '_files': None, '_lazy_attr': None}
{'origin': WalkPath('/tmp'), '_dirs': None, '_files': None, '_lazy_attr': None}

为了好玩,下面是我如何解决这个问题的。

这里发生的是,Path实现了__reduce__函数,它在__getstate____setstate__(它们是更高级别的酸洗函数(之前被调用。

这是PurePath__reduce__函数,Path的基类:

def __reduce__(self):
# Using the parts tuple helps share interned path parts
# when pickling related paths.
return (self.__class__, tuple(self._parts))

哦,不!好吧,我们可以看到会发生什么——这是故意设计的,只是为了传递它的一个部分的元组,完全放弃状态,形成自己的新版本。

我不想搞砸,但我也想确保我的状态在这里得到保护。因此,我创建了一个序列化程序,将这些属性作为元组参数(因为…__reduce__出于某种荒谬的原因只将单个元组作为参数(。

我还必须确保origin现在是Path对象,而不是WalkPath对象,否则我将以无休止的递归结束。我在__init__:中添加了一些强制和安全类型

if origin:
self.origin = Path(origin)
elif len(args) > 0:
try:
self.origin = Path(args[0].origin) or Path(args[0])
except:
self.origin = Path(self)

if not self.origin:
raise AttributeError(f"Could not infer 'origin' property when initializing 'WalkPath', for path '{args[0]}'")

然后我将这两种方法添加到WalkPath:中

# @overrides(__reduce__)
def __reduce__(self):
# From super()
# Using the parts tuple helps share internal path parts
# when pickling related paths.

# return (self.__class__, tuple(self._parts))
# This override passes its parts to a Path object (which
# natively pickles), then serializes and applies 
# its remaining attributes.

args = {**{'_parts': self._parts}, **self.__dict__}
return (self.__class__._from_kwargs, tuple(args.items()))
@classmethod
def _from_kwargs(cls, *args):
kwargs = dict(args)
new = cls(super().__new__(cls, 
*kwargs['_parts']),
origin=kwargs['origin'])
new.__dict__ = {**new.__dict__, **kwargs}
return new

最新更新