在多重处理后连接结果



我有一个函数,它通过对df:-进行多处理来创建数据帧

假设我的df中有10行,那么函数处理器将分别处理所有10行。我想要的是将函数处理器的所有输出连接起来,形成一个数据帧。

def processor(dff):
"""
reading data from a data frame and doing all sorts of data manipulation 
for multiprocessing
"""
return df

def main(infile, mdebug):
global debug
debug = mdebug
try:
lines = sum(1 for line in open(infile))
except Exception as err:
print("Error {} opening file: {}").format(err, infile)
sys.exit(2000)
if debug >= 2:
print(infile)
try:
dff = pd.read_csv(infile)
except Exception as err:
print("Error {}, opening file: {}").format(err, infile)
sys.exit(2000)
df_split = np.array_split(dff, (lines+1))
cores = multiprocessing.cpu_count()
cores = 64
# pool = Pool(cores)
pool = Pool(lines-1)
for n, frame in enumerate(pool.imap(processor, df_split), start=1):
if frame is not None:
frame.to_csv('{}'.format(n))
pool.close()
pool.join()
if __name__ == "__main__":
args = parse_args()
"""
print "Debug is: {}".format(args.debug)
"""
if args.debug >= 1:
print("Running in debug mode: "), args.debug
main(infile=args.infile, mdebug=args.debug)

您可以使用数据帧构造函数或concat来解决问题。合适的使用取决于你的代码的细节,你还没有包括

这里有一个更完整的例子:

import numpy as np
import pandas as pd
# create dummy dataset
dff = pd.DataFrame(np.random.rand(101, 5), columns=list('abcde'))
# process data
with Pool() as pool:
result = pool.map(processor, np.array_split(dff, 7))
# put it all back together in one dataframe
result = np.concat(result)

最新更新