有没有很好的方法来避免内存深度复制或减少在多处理中花费的时间



我正在使用Python环境的Pandas模块制作一个基于内存的"大数据"实时计算模块。

所以响应时间是这个模块的质量,非常关键和重要。

为了处理大型数据集,我拆分数据并并行处理子拆分数据。

在存储子数据结果的部分,花费了很多时间(第21行)。

我认为内部内存深度复制出现或传递的子数据未在内存中共享。

如果我用 C 或 C++ 编写模块,我将使用如下所示的指针或引用。

"process=Process(target=addNewDerivedColumn, args=[resultList, &sub_dataframe])"

"process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])

def addNewDerivedColumn(resultList, split_sub_dataframe&):...."

有没有一种好方法可以避免内存深度复制或减少多处理所花费的时间?"不优雅"很好。我已经准备好弄脏我的代码了。我尝试了weekref,RawValue,RawArray,Value,Pool,但都失败了。

该模块正在MacOS中开发,最终将在Linux或Unix中运行。

不要考虑Windows操作系统。

代码来了。

真正的代码在我的办公室里,但结构和逻辑与真实代码相同。

1 #-*- coding: UTF-8 -*-' 
2 import pandas as pd
3 import numpy as np
4 from multiprocessing import *
5 import time
6
7
8 def addNewDerivedColumn(resultList, split_sub_dataframe):
9    
10    split_sub_dataframe['new_column']=    np.abs(split_sub_dataframe['column_01']+split_sub_dataframe['column_01']) / 2
11    
12    print split_sub_dataframe.head()
13    
14    '''
15     i think that the hole result of sub-dataframe is copied to resultList, not reference value 
16     and in here time spend much
17     compare elapsed time of comment 21th line with the uncommented one
18     In MS Windows, signifiant difference of elapsed time doesn't show up
19     In Linux or Mac OS, the difference is big
20    '''
21    resultList.append(split_sub_dataframe)
22    
23
24
25 if __name__ == "__main__":
26    
27    # example data generation
28    # the record count of the real data is over 1 billion with about 10 columns.
29    dataframe = pd.DataFrame(np.random.randn(100000000, 4), columns=['column_01', 'column_02', 'column_03', 'column_04'])
30    
31
32    print 'start...'
33    start_time = time.time()
34    
35    # to launch 5 process in parallel, I split the dataframe to five sub-dataframes
36    split_dataframe_list = np.array_split(dataframe, 5)
37    
38    # multiprocessing 
39    manager = Manager()
40    
41    # result list
42    resultList=manager.list()
43    processList=[]
44    
45    for sub_dataframe in split_dataframe_list:
46        process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])
47        processList.append(process)
48        
49    for proc in processList: 
50        proc.start()
51    for proc in processList: 
52        proc.join()
53    
54    
55    print 'elapsed time  : ', np.round(time.time() - start_time,3)

如果你保持进程间通信到最低。因此,与其将子数据帧作为参数传递,不如传递索引值。子进程可以对公共数据帧本身进行切片。

生成子进程时,它会获取父进程的调用模块。因此,如果大型数据帧 df 是在生成多处理池之前在全局变量中定义,然后每个生成的子进程将有权访问df

在没有fork()的Windows上,启动一个新的python进程,并且调用模块已导入。因此,在 Windows 上,生成的子进程必须从头开始重新生成df,这可能需要时间和大量额外内存。

但是,在 Linux 上,您有写入时复制。这意味着生成的子进程访问(调用模块的)原始全局变量,而无需复制它们。仅当子进程尝试修改全局时,Linux 才然后在修改值之前创建单独的副本。

因此,如果您避免修改全局变量,则可以享受性能提升子流程。 我建议仅将子过程用于计算。返回值的计算,并让主进程整理结果进行修改原始数据帧。

import pandas as pd
import numpy as np
import multiprocessing as mp
import time
def compute(start, end):
    sub = df.iloc[start:end]
    return start, end, np.abs(sub['column_01']+sub['column_01']) / 2
def collate(retval):
    start, end, arr = retval
    df.ix[start:end, 'new_column'] = arr
def window(seq, n=2):
    """
    Returns a sliding window (of width n) over data from the sequence
    s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
    """
    for i in range(len(seq)-n+1):
        yield tuple(seq[i:i+n])
if __name__ == "__main__":
    result = []
    # the record count of the real data is over 1 billion with about 10 columns.
    N = 10**3
    df = pd.DataFrame(np.random.randn(N, 4),
                      columns=['column_01', 'column_02', 'column_03', 'column_04'])
    pool = mp.Pool()    
    df['new_column'] = np.empty(N, dtype='float')
    start_time = time.time()
    idx = np.linspace(0, N, 5+1).astype('int')
    for start, end in window(idx, 2):
        # print(start, end)
        pool.apply_async(compute, args=[start, end], callback=collate)
    pool.close()
    pool.join()
    print 'elapsed time  : ', np.round(time.time() - start_time,3)
    print(df.head())

受到这个问题和@unutbu答案的启发,我在github上写了一个并行版本的map。该功能适用于在具有多个内核的单机中对只读大数据结构进行无限并行处理。基本思想类似于@unutbu,使用临时全局变量来保存大数据结构(例如,数据框),并将其"名称"而不是变量本身传递给worker。但所有这些都封装在一个地图函数中,因此在 pathos 包的帮助下,它几乎是标准地图函数的直接替代品。示例用法如下,

# Suppose we process a big dataframe with millions of rows.
size = 10**9
df = pd.DataFrame(np.random.randn(size, 4),
                  columns=['column_01', 'column_02', 
                           'column_03', 'column_04'])
# divide df into sections of 10000 rows; each section will be
# processed by one worker at a time
section_size = 10000
sections = [xrange(start, start+section_size) 
            for start in xrange(0, size, section_size)]
# The worker function that processes one section of the
# df. The key assumption is that a child 
# process does NOT modify the dataframe, but do some 
# analysis or aggregation and return some result.
def func(section, df):
    return some_processing(df.iloc[section])
num_cores = 4
# sections (local_args) specify which parts of a big object to be processed;
# global_arg holds the big object to be processed to avoid unnecessary copy;
# results are a list of objects each of which is the processing results 
# of one part of a big object (i.e., one element in the iterable sections) 
# in order.
results = map(func, sections, global_arg=df,
              chunksize=10, 
              processes=num_cores)
# reduce results (assume it is a list of data frames)
result = pd.concat(results)

在我的一些文本挖掘任务中,由于大数据帧的昂贵复制操作,将 df 直接传递给 worker 函数的朴素并行实现甚至比单线程版本慢。但是,上述实现可以为具有 4 个内核的任务提供>3 倍的加速,这似乎非常接近真正的轻量级多线程。

最新更新