如何减少Python多处理的时间



我正在尝试在Python中构建多处理以降低计算速度,但是在多处理后,总体计算速度显着降低。我已经创建了4个不同的过程,并将数据框架分为4个不同的数据框,这将是每个过程的输入。在每个过程定时之后,似乎高架成本很大,并且想知道是否有降低这些间接费用的方法。

我正在使用Windows7,Python 3.5,我的机器有8个内核。

def doSomething(args, dataPassed,):
    processing data, and calculating outputs
def parallelize_dataframe(df, nestedApply):
    df_split = np.array_split(df, 4)
    pool = multiprocessing.Pool(4)
    df = pool.map(nestedApply, df_split)
    print ('finished with Simulation')
    time = float((dt.datetime.now() - startTime).total_seconds())
    pool.close()
    pool.join()
def nestedApply(df):
    func2 = partial(doSomething, args=())
    res = df.apply(func2, axis=1)
    res =  [output Tables]
    return res
if __name__ == '__main__':
data = pd.read_sql_query(query, conn)
parallelize_dataframe(data, nestedApply)

我建议使用队列,而不是提供数据框架作为块。您需要大量的Ressources来复制每个块,并且需要花费很多时间。如果您的数据框架真的很大,则可能会用尽内存。使用队列您可以从熊猫中的快速迭代器中受益。这是我的方法。高架因工人的复杂性而降低。不幸的是,我的工人真的很容易表明这一点,但是sleep模拟了一些复杂性。

import pandas as pd
import multiprocessing as mp
import numpy as np
import time

def worker(in_queue, out_queue):
    for row in iter(in_queue.get, 'STOP'):
        value = (row[1] * row[2] / row[3]) + row[4]
        time.sleep(0.1)
        out_queue.put((row[0], value))
if __name__ == "__main__":
    # fill a DataFrame
    df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD'))
    in_queue = mp.Queue()
    out_queue = mp.Queue()
    # setup workers
    numProc = 2
    process = [mp.Process(target=worker,
                          args=(in_queue, out_queue)) for x in range(numProc)]
    # run processes
    for p in process:
        p.start()
    # iterator over rows
    it = df.itertuples()
    # fill queue and get data
    # code fills the queue until a new element is available in the output
    # fill blocks if no slot is available in the in_queue
    for i in range(len(df)):
        while out_queue.empty():
            # fill the queue
            try:
                row = next(it)
                in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True)  # row = (index, A, B, C, D) tuple
            except StopIteration:
                break
        row_data = out_queue.get()
        df.loc[row_data[0], "Result"] = row_data[1]
    # signals for processes stop
    for p in process:
        in_queue.put('STOP')
    # wait for processes to finish
    for p in process:
        p.join()

使用numProc = 2,每循环需要50秒,使用numProc = 4,它的速度是两倍。

相关内容

  • 没有找到相关文章

最新更新