我正在尝试使用gcsfs运行程序。GCSFileSystem使用python的concurrent.futures.ProcessPoolExecutor访问谷歌云存储.
要运行的代码实际上非常复杂,但我设法将其归结为这个最小的非工作示例:
from concurrent.futures import ProcessPoolExecutor
from gcsfs import GCSFileSystem
def f(path):
print(f"Creating {path}...")
print("Created. Getting glob...")
print(main_fs.glob(path))
print("Done!")
if __name__ == "__main__":
main_fs = GCSFileSystem()
print(main_fs.glob("code_tests_sand"))
with ProcessPoolExecutor(max_workers=10) as pool:
l_ = []
for sub_rules_list in (pool.map(f, ["code_tests_sand"])):
l_.append(0)
我期望:
['code_tests_sand']
Creating code_tests_sand...
Created. Getting glob...
['code_tests_sand']
Done!
我:
['code_tests_sand']
Creating code_tests_sand...
Created. Getting glob...
程序被卡在这里没有结束
我找到了一种方法,通过显式地将GCSFileSystem对象提供给函数来获得预期的输出:
from concurrent.futures import ProcessPoolExecutor
from gcsfs import GCSFileSystem
def f(path, ff):
print(f"Creating {path}...")
print("Created. Getting glob...")
print(ff.glob(path))
print("Done!")
if __name__ == "__main__":
main_fs = GCSFileSystem()
print(main_fs.glob("code_tests_sand"))
with ProcessPoolExecutor(max_workers=10) as pool:
l_ = []
for sub_rules_list in (pool.map(f, ["code_tests_sand"], [main_fs])):
l_.append(0)
然而,这对我来说不是一个好的解决方案,因为我将无法在我的实际代码中做到这一点。你知道为什么会发生这种情况吗?我该如何解决?
仅供参考,我在Ubuntu 18, Python 3.8上运行,这是我的pip冻结输出:
aiohttp==3.7.3
async-timeout==3.0.1
attrs==20.3.0
cachetools==4.2.1
certifi==2020.12.5
chardet==3.0.4
decorator==4.4.2
fsspec==0.8.5
gcsfs==0.7.2
google-auth==1.27.0
google-auth-oauthlib==0.4.2
idna==2.10
multidict==5.1.0
oauthlib==3.1.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
requests==2.25.1
requests-oauthlib==1.3.0
rsa==4.7.1
six==1.15.0
typing-extensions==3.7.4.3
urllib3==1.26.3
yarl==1.6.3
我最终找到了一种方法:以类的形式包装GCSFileSystem:
from concurrent.futures import ProcessPoolExecutor
from gcsfs import GCSFileSystem
from copy import copy
import sys
class Dummy:
fs = None
@classmethod
def set_fs(cls, fs):
cls.fs = fs
def __init__(self, path):
self.fs = copy(Dummy.fs)
self.path = path
def glob(self):
return self.fs.glob(self.path)
def f(path):
print(f"Creating {path}...")
p = Dummy(path)
print("Created. Getting glob...")
print(p.glob())
print(sys.getsizeof(p.fs))
print("Done!")
if __name__ == "__main__":
main_fs = GCSFileSystem()
print(main_fs.glob("code_tests_sand"))
Dummy.set_fs(main_fs)
with ProcessPoolExecutor(max_workers=10) as pool:
l_ = []
for sub_rules_list in (pool.map(f, ["code_tests_sand"])):
l_.append(0)
注意是必需的在每个类实例化中复制GCSFileSystem对象。如果不是,只要不是多处理,类就可以正常工作,但在多处理时将显示相同的问题行为。GCSFileSystem在这里的权重只有50字节左右,所以复制它不会对内存造成太大的影响。
问题与Linux的默认设置有关。在我的Mac上,多处理工作如预期的那样,但当部署在Linux VM上时,它甚至无法初始化gcsfs。下面的文章指出了这个问题。
https://britishgeologicalsurvey.github.io/science/python-forking-vs-spawn/
我发现最简单的解决方法就是设置
import multiprocessing as mp
...
...
...
if __name__=='__main__':
mp.set_start_method('forkserver')