无法将函数并行映射到 tarfile 成员



我有一个包含bz2压缩文件的tarfile。我想将函数clean_file应用于每个bz2文件,并整理结果。在系列中,这很容易通过一个循环:

import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import Pool
def clean_file(member):
if '.bz2' in str(member):
f = tr.extractfile(member)
with bz2.open(f, "rt") as bzinput:
dicts = []
for i, line in enumerate(bzinput):
line = line.replace('"name"}', '"name":" "}')
dat = json.loads(line)
dicts.append(dat)
bzinput.close()
f.close()
del f, bzinput
processed = dicts[0]
return processed
else:
pass

# Open tar file and get contents (members)
tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)

# Apply the clean_file function in series
i=0
processed_files = []
for m in members:
processed_files.append(clean_file(m))
i+=1
print('done '+str(i)+'/'+str(num_files))

然而,我需要能够同时做到这一点。我尝试的方法使用Pool,如下所示:

# Apply the clean_file function in parallel
if __name__ == '__main__':
with Pool(2) as p:
processed_files = list(p.map(clean_file, members))

但这会返回一个OSError:

Traceback (most recent call last):
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "parse_data.py", line 19, in clean_file
for i, line in enumerate(bzinput):
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/bz2.py", line 195, in read1
return self._buffer.read1(size)
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 68, in readinto
data = self.read(len(byte_view))
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 103, in read
data = self._decompressor.decompress(rawblock, size)
OSError: Invalid data stream
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "parse_data.py", line 53, in <module>
processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/site-packages/tqdm/std.py", line 1167, in __iter__
for obj in iterable:
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 735, in next
raise value
OSError: Invalid data stream

所以我想这种方式不能正确地访问data.tar或其他文件。如何并行应用该函数?

我猜这将适用于任何包含bz2文件的tar存档,但以下是我的数据来重现错误:https://github.com/johnf1004/reproduce_tar_error

您没有指定在哪个平台上运行,但我怀疑它是Windows,因为您有。。。

if __name__ == '__main__':
main()

这对于在使用OS函数CCD_ 3来创建新进程的平台上创建进程的代码来说是必需的。但这也意味着,当创建一个新进程(例如,您正在创建的进程池中的所有进程(时,每个进程都会从程序的顶部重新执行源程序。这意味着每个池进程都在执行以下代码:

tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)

然而,我不明白为什么这本身会导致错误,但我不能确定。然而,问题可能是,这是在调用辅助函数之后执行的,clean_file正在被调用,因此tr尚未设置。如果这段代码在clean_file之前,它可能会起作用,但这只是猜测。当然,在每个池过程中提取具有members = tr.getmembers()的成员是浪费的每个进程都需要打开tar文件,最好只打开一次

但显而易见的是,您发布的stacktrace与您的代码不匹配。您显示:

Traceback (most recent call last):
File "parse_data.py", line 53, in <module>
processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))

然而,您的代码没有任何对tqdm或使用方法imap的引用。现在,当您发布的代码与产生异常的代码不完全匹配时,分析实际问题变得更加困难。

如果你在Mac上运行,可能会使用fork创建新进程,当主进程创建了多个线程(你不一定会看到,可能是tarfile模块创建的(,然后你创建了一个新进程时,这可能会有问题,我已经指定了代码来确保spawn用于创建新进程。无论如何,下面的代码应该可以工作。它还介绍了一些优化。如果没有,请发布一个新的堆栈竞赛。

import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import get_context
def open_tar():
# open once for each process in the pool
global tr
tr = tarfile.open('data.tar')
def clean_file(member):
f = tr.extractfile(member)
with bz2.open(f, "rt") as bzinput:
for line in bzinput:
line = line.replace('"name"}', '"name":" "}')
dat = json.loads(line)
# since you are returning just the first occurrence:
return dat
def main():
with tarfile.open('data.tar') as tr:
members = tr.getmembers()
# just pick members where '.bz2' is in member:
filtered_members = filter(lambda member: '.bz2' in str(member), members)
ctx = get_context('spawn')
# open tar file just once for each process in the pool:
with ctx.Pool(initializer=open_tar) as pool:
processed_files = pool.map(clean_file, filtered_members)
print(processed_files)
# required for when processes are created using spawn:
if __name__ == '__main__':
main()

似乎发生了某种种族状况。在每个子进程中单独打开tar文件可以解决问题:

import json
import bz2
import tarfile
import logging
from multiprocessing import Pool

def clean_file(member):
if '.bz2' not in str(member):
return
try:
with tarfile.open('data.tar') as tr:
with tr.extractfile(member) as bz2_file:
with bz2.open(bz2_file, "rt") as bzinput:
dicts = []
for i, line in enumerate(bzinput):
line = line.replace('"name"}', '"name":" "}')
dat = json.loads(line)
dicts.append(dat)
return dicts[0]
except Exception:
logging.exception(f"Error while processing {member}")

def process_serial():
tr = tarfile.open('data.tar')
members = tr.getmembers()
processed_files = []
for i, member in enumerate(members):
processed_files.append(clean_file(member))
print(f'done {i}/{len(members)}')

def process_parallel():
tr = tarfile.open('data.tar')
members = tr.getmembers()
with Pool() as pool:
processed_files = pool.map(clean_file, members)
print(processed_files)

def main():
process_parallel()

if __name__ == '__main__':
main()

编辑:

注意,解决这个问题的另一种方法是只使用派生启动方法:

multiprocessing.set_start_method('spawn')

通过这样做,我们指示Python";深度复制";子进程中的文件句柄。在默认情况下;叉子";start方法,父级和子级的文件句柄共享相同的偏移量。

最新更新