如何将多个文件读取到多个线程/过程中以优化数据分析



我正在尝试读取python中的3个不同文件,并做点什么以提取数据。然后我想将数据合并到一个大文件中。

由于每个单独的文件已经很大,并且有时需要进行数据处理,所以我在想

是否
  • 我可以一次读取所有三个文件(在多个线程/过程中(
  • 等待所有文件完成的过程
  • 当所有输出准备就绪时,然后将所有数据输送到下游函数以将其合并。

有人可以建议对此代码做一些改进以完成我想做的事情。

import pandas as pd
file01_output = ‘’
file02_output = ‘’
file03_output = ‘’
# I want to do all these three “with open(..)” at once.
with open(‘file01.txt’, ‘r’) as file01:
    for line in file01:
        something01 = do something in line
        file01_output += something01
with open(‘file02.txt’, ‘r’) as file01:
    for line in file01:
        something02 = do something in line
        file02_output += something02
with open(‘file03.txt’, ‘r’) as file01:
    for line in file01:
        something03 = do something in line
        file03_output += something03
def merge(a,b,c):
    a = file01_output
    b = file01_output
    c = file01_output
    # compile the list of dataframes you want to merge
    data_frames = [a, b, c]
    df_merged = reduce(lambda  left,right: pd.merge(left,right,
                       on=['common_column'], how='outer'), data_frames).fillna('.')

有很多方法可以在您的问题中使用多处理,因此我只提出一种方式。正如您提到的,由于文件中的数据上发生的处理是CPU绑定的,因此您可以在单独的过程中运行并期望看到一些改进(如果有的话,取决于问题,算法,#cores,ETC。(。例如,整体结构看起来就像只有一个pool,您需要该map列表,您需要处理的所有filenames列表,并且在该功能中您可以进行计算。

使用一个具体示例更容易。让我们假装我们有一个具有NUMBER列的CSVS 'file01.csv', 'file02.csv', 'file03.csv'列表,我们想计算该数字是否为Prime(CPU绑定(。例如,file01.csv

NUMBER
1
2
3
...

其他文件看起来相似,但具有不同的数字以避免重复工作。计算素数的代码然后看起来像这样:

import pandas as pd
from multiprocessing import Pool
from sympy import isprime
def compute(filename):
    # IO (probably not faster)
    my_data_df = pd.read_csv(filename)
    # do some computing (CPU)
    my_data_df['IS_PRIME'] = my_data_df.NUMBER.map(isprime)
    return my_data_df
if __name__ == '__main__':
    filenames = ['file01.csv', 'file02.csv', 'file03.csv']
    # construct the pool and map to the workers
    with Pool(2) as pool:
        results = pool.map(compute, filenames)
    print(pd.concat(results))

我已经将sympy软件包用于方便的isprime方法,我确定数据的结构完全不同,但希望该示例也说明了您也可以使用的结构。在pool(或Process ES列表(中执行CPU绑定的计算然后合并/降低/串联结果的计划是解决问题的合理方法。

最新更新