我有工作代码可以从大型数据框架编写以在Excel文件中分离床单,但大约需要30-40分钟。我想找到一种使用多处理速度更快运行的方法。
我尝试使用多处理将其重写,以便可以与多个处理器并行完成每个Excel选项卡。修订后的代码在没有错误的情况下运行,但也没有正确地写入Excel文件。任何建议都会有所帮助。
代码的原始工作部分:
import os
from excel_writer import append_df_to_excel
import pandas as pd
path = os.path.dirname(
os.path.abspath(__file__)) + '\fund_data.xlsx' # get path to current directory and excel filename for data
data_cols = df_all.columns.values.tolist() # Create a list of the columns in the final dataframe
# print(data_cols)
for column in data_cols: # For each column in the dataframe
df_col = df_all[column].unstack(level = -1) # unstack so Dates are across the top oldest to newest
df_col = df_col[df_col.columns[::-1]] # reorder for dates are newest to oldest
# print(df_col)
append_df_to_excel(path, df_col, sheet_name = column, truncate_sheet = True,
startrow = 0) # Add data to excel file
修订的代码尝试多处理:
import os
from excel_writer import append_df_to_excel
import pandas as pd
import multiprocessing
def data_to_excel(col, excel_fn, data):
data_fr = pd.DataFrame(data) # switch list back to dataframe for putting into excel file sheets
append_df_to_excel(excel_fn, data_fr, sheet_name = col, truncate_sheet = True, startrow = 0) # Add data to sheet in excel file
if __name__ == "__main__":
path = os.path.dirname(
os.path.abspath(__file__)) + '\fund_data.xlsx' # get path to current directory and excel filename for data
data_cols = df_all.columns.values.tolist() # Create a list of the columns in the final dataframe
# print(data_cols)
pool = multiprocessing.Pool(processes = multiprocessing.cpu_count())
for column in data_cols: # For each column in the dataframe
df_col = df_all[column].unstack(level = -1) # unstack so Dates are across the top oldest to newest
df_col = df_col[df_col.columns[::-1]] # reorder for dates are newest to oldest
# print(df_col)
data_col = df_col.values.tolist() # convert dataframe coluumn to a list to use in pool
pool.apply_async(data_to_excel, args = (column, path, data_col))
pool.close()
pool.join()
我不知道从多个进程写入单个文件的正确方法。我需要解决类似的问题。我使用创建作者进程来解决它,该过程使用队列获取数据。您可以在这里看到我的解决方案(对不起,它没有记录下来(。
简化版本(草稿(
from multiprocessing import Queue
input_queue = Queue()
res_queue = Queue()
process_list = []
def do_calculation(input_queue, res_queue, calculate_function):
try:
while True:
data = in_queue.get(False)
try:
res = calculate_function(**data)
out_queue.put(res)
except ValueError as e:
out_queue.put("fail")
logging.error(f" fail on {data}")
except queue.Empty:
return
# put data in input queue
def save_process(out_queue, file_path, count):
for i in range(count):
data = out_queue.get()
if data == "fail":
continue
# write to excel here
for i in range(process_num):
p = Process(target=do_calculation, args=(input_queue, res_queue, calculate_function))
p.start()
process_list.append(p)
p2 = Process(target=save_process, args=(res_queue, path_to_excel, data_size))
p2.start()
p2.join()
for p in process_list:
p.join()