Python 多处理处理列表的速度较慢,即使在使用共享内存时也是如此



当我使用 Manager 对象跨进程共享列表时,此代码的非并行化版本的运行速度如何比并行版本快得多。 我这样做是为了避免任何序列化,并且我不需要编辑列表。

我从 Oracle 返回一个 800,000 行的数据集,将其转换为列表并使用 Manager.list(( 将其存储在共享内存中。

我正在并行迭代查询结果中的每一列,以获取一些统计信息(我知道我可以在SQL中做到这一点(。

主代码:

import cx_Oracle
import csv
import os
import glob
import datetime
import multiprocessing as mp
import get_column_stats as gs;
import pandas as pd
import pandas.io.sql as psql

def get_data():
    print("Starting Job: " + str(datetime.datetime.now()));
    manager = mp.Manager()
    # Step 1: Init multiprocessing.Pool()   
    pool = mp.Pool(mp.cpu_count())
    print("CPU Count: " + str(mp.cpu_count()))
    dsn_tns = cx_Oracle.makedsn('myserver.net', '1521', service_name='PARIELGX');
    con = cx_Oracle.connect(user='fred', password='password123', dsn=dsn_tns);

    stats_results = [["OWNER","TABLE","COLUMN_NAME","RECORD_COUNT","DISTINCT_VALUES","MIN_LENGTH","MAX_LENGTH","MIN_VAL","MAX_VAL"]];
    sql = "SELECT * FROM ARIEL.DIM_REGISTRATION_SET"
    cur = con.cursor();
    print("Start Executing SQL: " + str(datetime.datetime.now()));
    cur.execute(sql);
    print("End SQL Execution: " + str(datetime.datetime.now()));
    print("Start SQL Fetch: " + str(datetime.datetime.now()));
    rs = cur.fetchall();
    print("End SQL Fetch: " + str(datetime.datetime.now()));
    print("Start Creation of Shared Memory List: " + str(datetime.datetime.now()));    
    lrs = manager.list(list(rs)) # shared memory list
    print("End Creation of Shared Memory List: " + str(datetime.datetime.now()));
    col_names = [];
    for field in cur.description:   
        col_names.append(field[0]); 
    #print(col_names)
    #print('-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-')
    #print(rs)
    #print('-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-')
    #print(lrs)


    col_index = 0;

    print("Start In-Memory Iteration of Dataset: " + str(datetime.datetime.now()));
      # we go through every field
    for field in cur.description:   
        col_names.append(field[0]);       
    # start at column 0
    col_index = 0;
    # iterate through each column, to gather stats from each column using parallelisation
    pool_results = pool.map_async(gs.get_column_stats_rs, [(lrs, col_name, col_names) for col_name in col_names]).get()
    for i in pool_results:
        stats_results.append(i)
    # Step 3: Don't forget to close
    pool.close() 

    print("End In-Memory Iteration of Dataset: " + str(datetime.datetime.now()));
    # end filename for
    cur.close();           
    outfile = open('C:jupyterExperimentstats_dim_registration_set.csv','w');
    writer=csv.writer(outfile,quoting=csv.QUOTE_ALL, lineterminator='n');
    writer.writerows(stats_results);
    outfile.close()
    print("Ending Job: " + str(datetime.datetime.now()));


get_data();

并行调用的代码:

def get_column_stats_rs(args):
    # rs is a list recordset of the results
    rs, col_name, col_names = args
    col_index = col_names.index(col_name)
    sys.stdout = open("col_" + col_name + ".out", "a")
    print("Starting Iteration of Column: " + col_name)
    max_length = 0
    min_length = 100000  # abitrarily large number!!
    max_value = ""
    min_value = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"  # abitrarily large number!!
    distinct_value_count = 0
    has_values = False  # does the column have any non-null values
    has_null_values = False
    row_count = 0
    # create a dictionary into which we can add the individual items present in each row of data
    # a dictionary will not let us add the same value more than once, so we can simply count the
    # dictionary values at the end
    distinct_values = {}
    row_index = 0
    # go through every row, for the current column being processed to gather the stats
    for val in rs:
        row_value = val[col_index]
        row_count += 1
        if row_value is None:
            value_length = 0
        else:
            value_length = len(str(row_value))
        if value_length > max_length:
            max_length = value_length
        if value_length < min_length:
            if value_length > 0:
                min_length = value_length
        if row_value is not None:
            if str(row_value) > max_value:
                max_value = str(row_value)
            if str(row_value) < min_value:
                min_value = str(row_value)
        # capture distinct values
        if row_value is None:
            row_value = "Null"
            has_null_values = True
        else:
            has_values = True
            distinct_values[row_value] = 1
        row_index += 1
        # end row for
    distinct_value_count = len(distinct_values)
    if has_values == False:
        distinct_value_count = None
        min_length = None
        max_length = None
        min_value = None
        max_value = None
    elif has_null_values == True and distinct_value_count > 0:
        distinct_value_count -= 1
    if min_length == 0 and max_length > 0 and has_values == True:
        min_length = max_length
    print("Ending Iteration of Column: " + col_name)
    return ["ARIEL", "DIM_REGISTRATION_SET", col_name, row_count, distinct_value_count, min_length, max_length,
        strip_crlf(str(min_value)), strip_crlf(str(max_value))]

辅助功能:

def strip_crlf(value):
    return value.replace('n', ' ').replace('r', '')

我正在使用 Manager.list(( 对象在进程之间共享状态:

lrs = manager.list(list(rs)) # shared memory list

并在 map_async(( 方法中传递列表:

pool_results = pool.map_async(gs.get_column_stats_rs, [(lrs, col_name, col_names) for col_name in col_names]).get()

管理器开销会增加您的运行时。此外,您在这里没有直接使用共享内存。您只使用多处理管理器,众所周知,它比共享内存或单线程实现慢。如果代码中不需要同步,这意味着您没有修改共享数据,只需跳过管理器并直接使用共享内存对象即可。

https://docs.python.org/3.7/library/multiprocessing.html

服务器进程管理器比使用共享内存更灵活 对象,因为它们可以支持任意对象类型。 此外,单个管理器可以由不同 网络上的计算机。但是,它们比使用共享慢 记忆。

最新更新