所以我有大量需要处理成CSV的文件。每个文件本身都很大,每一行都是一个字符串。文件的每一行可以表示三种类型的数据之一,每种类型的处理方式略有不同。我当前的解决方案如下所示:
type1_columns = [...]
type2_columns = [...]
type3_columns = [...]
file_list = os.listdir(filelist)
def process_type1_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type1_series = pd.Series(to_append, index=type1_columns)
return type1_series
def process_type2_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type2_series = pd.Series(to_append, index=type2_columns)
return type2_series
def process_type3_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type3_series = pd.Series(to_append, index=type3_columns)
return type3_series
def process_file(file):
type1_df = pd.DataFrame(columns=type1_columns)
type2_df = pd.DataFrame(columns=type2_columns)
type3_df = pd.DataFrame(columns=type3_columns)
with open(filepath/file) as f:
data=f.readlines()
for line in data:
#some logic to get the record_type and convert line to json
record_type = ...
json_line = ...
if record_type == "type1":
type1_series = process_type1_line(json_line)
type1_df = type1_df.append(type1_series, ignore_index=True)
if record_type == "type2":
type2_series = process_type2_line(json_line)
type2_df = type2_df.append(type2_series, ignore_index=True)
if record_type == "type3":
type3_series = process_type3_line(json_line)
type3_df = type3_df.append(type3_series, ignore_index=True)
type1_df.to_csv(type1_csv_path.csv)
type2_df.to_csv(type2_csv_path.csv)
type3_df.to_csv(type3_csv_path.csv)
for file in file_list:
process_file(file)
我循环访问文件,并为三种不同类型的记录中的每一种创建数据帧。我解析这些行并为每个行调用适当的处理函数。返回的序列将追加到该文件的该record_type的最终数据帧。处理完文件后,三个数据帧将另存为 CSV,我们从下一个文件开始。
问题是这种方法花费的时间太长,我需要数周时间来处理所有文件。
我试图通过使用多处理(我没有很多经验)来修改我的方法,如下所示:
with ThreadPoolExecutor(max_workers=30) as executor:
futures = [executor.submit(process_file, file) for file in file_list]
在一些日志记录打印语句中,我可以看到这开始处理 30 个文件,但没有一个文件完成,所以我至少知道我的方法有缺陷。谁能解释解决这个问题的最佳方法是什么?也许是多处理和异步的某种组合?
你有两个大问题:
您将整个输入文件加载到内存中,在内存中生成整个结果,然后一次写入整个输出文件。这意味着,如果您有 30 个工作线程并行运行,则需要与 30 个(自描述的)大文件成比例的内存。您存储所有数据也是两倍,一次是
f.readlines()
返回的str
行的list
,然后是三个DataFrame
之一;如果您使用了没有执行器的代码,则按原样更改:data=f.readlines() for line in data:
自:
for line in f:
您将立即将内存使用量减少大约一半,这(可能)足以阻止页面抖动。也就是说,您仍将使用与文件大小成比例的内存来存储
DataFrame
,因此如果您并行化代码,您将恢复抖动,并且如果文件足够大,即使没有并行性,您仍可能抖动。你对每一行都使用
.append
,IIRC来说,DataFrame
是画家施莱米尔算法的一种形式:每个append
都制作一个全新的DataFrame
,将旧DataFrame
的全部内容加上少量的新数据复制到一个新的DataFrame
中,随着现有数据越来越大,工作需要的时间越来越长;O(n)
工作应该摊销的东西变成了O(n**2)
工作。
在两者之间,您使用的内存比需要的要多得多,并且在重复的追加上执行大量不必要的繁忙工作。并行性可能有助于更快地完成繁忙的工作,但作为交换,它将您的内存需求增加了 30 倍;很有可能,你没有那么多的 RAM(如果这些文件真的很大,你可能没有足够的 RAM 来容纳其中一个文件),你最终会陷入页面抖动(将内存写出到 pagefile/swap 文件,为其他东西腾出空间,按需读回它,并经常丢弃在你完成之前分页的内存, 使内存访问与磁盘性能相关联,这比 RAM 访问慢几个数量级)。
我对Pandas的了解还不够多,无法说它是否为你正在做的事情提供了一些更好的增量解决方案,但你真的不需要一个;只需逐行使用输入,并使用csv
模块随时编写行。您的内存要求将从"与每个输入文件的大小成比例"下降到"与输入文件每一行的数据成比例"。
您的process_file
函数最终将如下所示:
def process_file(file):
# Open input file and all output files (newline='' needed to play nice with csv module
# which takes control of newline format to ensure dialect rules followed precisely,
# regardless of OS line separator rules)
with open(filepath/file) as f,
open(type1_csv_path, 'w', newline='') as type1f,
open(type2_csv_path, 'w', newline='') as type2f,
open(type3_csv_path, 'w', newline='') as type3f:
csv1 = csv.writer(type1f)
csv1.writerow(type1_columns) # Omit if no useful column header
csv2 = csv.writer(type2f)
csv2.writerow(type2_columns) # Omit if no useful column header
csv3 = csv.writer(type3f)
csv3.writerow(type3_columns) # Omit if no useful column header
for line in f: # Directly iterating file object lazily fetches line at a time
# where .readlines() eagerly fetches whole file, requiring
# a ton of memory for no reason
#some logic to get the record_type and convert line to json
record_type = ...
json_line = ...
if record_type == "type1":
type1_series = process_type1_line(json_line)
csv1.writerow(type1_series) # Might need to convert to plain list if Series
# doesn't iterate the values you need directly
elif record_type == "type2":
type2_series = process_type2_line(json_line)
csv2.writerow(type2_series)
elif record_type == "type3":
type3_series = process_type3_line(json_line)
csv3.writerow(type3_series)
如果这按原样工作(没有执行程序),请以这种方式使用它。如果您即使没有执行程序也会进行页面抖动,或者文件足够大以至于重复的append
严重伤害了您,这可能足以使其自行工作。如果它太慢,执行程序可能会提供一点好处,如果你做了很多工作来将每一行处理成输出格式(因为当大多数工作线程正在处理时,一个或两个工作线程可以充分共享磁盘访问以进行读取和写入),但如果每行的处理工作量很低,任何超过少数工作线程(我从两三个开始)只会增加磁盘争用(特别是如果你是使用旋转磁盘硬盘驱动器,而不是SSD),并行性要么无济于事,要么会主动损害。
您可能需要调整使用的确切CSV方言(作为参数传递给csv.writer
),并可能为输出文件显式设置特定encoding
,而不是区域设置默认值(例如,将encoding='utf-8'
或encoding='utf-16'
传递给open
进行写入,因此它始终以.csv
文件的使用者期望的编码写入), 但这是一般的形式。