如何使用python-multiprocessing来连接许多文件/数据帧?



我对python和编程比较陌生,只是用它来分析模拟数据。我有一个目录"result_1/"有超过150000个CSV文件与模拟数据,我想连接到一个pandas-dataFrame。为了避免readdir()一次只读取32K目录条目的问题,我准备了"files.csv"-列出目录下的所有文件。

("sim", "det", "run"是我从文件名中读取的信息片段,并作为Series插入到dataFrame中。为了更好地忽略,我从concat中去掉了它们的定义。)

我的问题如下:这个程序需要太多的时间来运行,我想使用多处理/线程来加速for循环,但由于我以前从未使用过mp/mt,我甚至不知道它是否或如何在这里使用。 提前谢谢你,祝你有美好的一天!
import numpy as np                          
import pandas as pd                         
import os
import multiprocessing as mp
df = pd.DataFrame()
path = 'result_1/'
list = pd.read_csv('files.csv', encoding='utf_16_le', names=['f'])['f'].values.tolist()
for file in list:
dftemp = pd.read_csv(r'{}'.format(os.path.join(path, file)), skiprows=8, names=['x', 'y', 'z', 'dos'], sep=',').drop(['y', 'z'], axis=1)
sim = pd.Series(int(file.split('Nr')[1].split('_')[0]) * np.ones((300,), dtype=int))
det = pd.Series(int(file.split('Nr')[0]) * np.ones((300,), dtype=int))
run = pd.Series(int(file[-8:-4]) * np.ones((300,), dtype=int))
dftemp = pd.concat([sim, det, run, dftemp], axis=1)
df = pd.concat([df, dftemp], axis=0)
df.rename({0:'sim', 1:'det', 2:'run', 3:'x', 4:'dos'}, axis=1).to_csv(r'df.csv')

CSV文件看起来像这样:"193Nr6_Run_0038.csv"(初版)

#(8 lines of things I don't need.)
0, 0, 0, 4.621046656438921e-09
1, 0, 0, 4.600856584602298e-09
(... 300 lines of data [x, y, z, dose])

由于CPU和RAM的限制,并行处理dataframe可能很困难。我不知道你的硬件规格,也不知道你的dataframe的细节。但是,我会使用多处理来"解析/生成"。数据框架,然后将它们连接起来。下面是一个例子:

import numpy as np                          
import pandas as pd                         
import os
from multiprocessing import Pool

path = 'result_1/'
list_of_files = pd.read_csv('files.csv', encoding='utf_16_le', names=['f'])['f'].values.tolist()
#make a function to replace the for-loop:
def my_custom_func(file):
dftemp = pd.read_csv(r'{}'.format(os.path.join(path, file)), skiprows=8, names=['x', 'y', 'z', 'dos'], sep=',').drop(['y', 'z'], axis=1)
sim = pd.Series(int(file.split('Nr')[1].split('_')[0]) * np.ones((300,), dtype=int))
det = pd.Series(int(file.split('Nr')[0]) * np.ones((300,), dtype=int))
run = pd.Series(int(file[-8:-4]) * np.ones((300,), dtype=int))
return pd.concat([sim, det, run, dftemp], axis=1)
#use multiprocessing to process multiple files at once
with Pool(8) as p: #8 processes simultaneously. Avoid using more processes than cores in your CPU
dataframes = p.map(my_custom_func, list_of_files)
#Finally, concatenate them all
df = pd.concat(dataframes)
df.rename({0:'sim', 1:'det', 2:'run', 3:'x', 4:'dos'}, axis=1).to_csv(r'df.csv')

查看multiprocessing.Pool()获取更多信息

最新更新