我正面临concurrent.futures
的ProcessPoolExecutor
的问题,我正试图在我的一个类中使用它:
def __init__():
self._pool = ProcessPoolExecutor()
def handle_event():
...
filepath: Path = xxxx
future = self._pool.submit(self.test, filepath)
res = future.result()
self.logger.debug(res)
def test(self, filepath: Path):
print("Test ProcessPoolExecutor")
return 3
这个小代码有一个奇怪的行为。首先,当我删除使用future.result()
获取结果时,我在test()
函数中看不到print
消息的输出。
然后,当我明确询问未来的结果时,我得到了一个TypeError
:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'PyCapsule' object
我不明白的是,我发送的类型(
Path
(和接收的类型(int
(都是可pickle的。其次,我甚至不知道PyCapsule依赖是什么,因为它没有出现在我的
requirements.txt
中,也没有出现在pip freeze
(相关的SO帖子(中
.nox/run/bin/pip freeze | grep -E '(capsule|dill)'
知道为什么我看不到打印报表出现吗?PyCapsule错误怎么办?是我的类型有问题,还是应用程序中的其他地方有问题?
谢谢!
将作业调度到concurrent.futures.ProcessPoolExecutor
时,您同时发送函数名称和参数。子进程接收函数名称并在其内存中查找。
当您传递对象方法时,事情会变得更加复杂。子进程在其函数中找不到这样的方法。因此,父对象必须pickle并运送整个对象。
这就是你遇到问题的地方:
cls(buf, protocol).dump(obj)
pickle协议不知道如何序列化对象,因为它包含不可拾取的组件。特别是,PyCapsule是一个内部Python数据结构。
建议不要将对象方法传递到处理池,因为很难预测对象是否可拾取。此外,序列化整个对象并通过管道传输它(而不仅仅是传递函数名(会增加成本。
有关哪些内容易于抓取,哪些内容不易于抓取的更多信息,请参阅其模块文档。
如果你不能遵守上面的建议,你可以看看其他的pickle实现,比如dill。