数据帧中循环的并行处理



想要并行处理下面的代码。由于某种原因,我必须对函数进行子集化,然后应用函数。请注意,子集大小将不一致。

for i in range(0, df['col1'].max()+1):
subset = df[ df['col1'] == i ]
subset_result = func(subset)
result = result.append(subset_result)

使用多处理尝试以下代码:

import multiprocessing
def f(x):
return x*x
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
if __name__ == '__main__':
n_core = multiprocessing.cpu_count()
p = multiprocessing.Pool(processes= n_core)
data = range(0, 8)
subsets = chunks(data, n_core)
subset_results = []
for subset in subsets:
subset_results.append(p.map(f, subset))
print(subset_results)

在您的情况下,可以为您执行的块函数如下:

def chunks_series(s):
subsets = []
for i in range(s.max() + 1):
subset = s[s == i]
subsets.append(subset.values)
return subsets
subsets = chunks_series(df['col1'])

或者,您可以在同一个循环中执行所有操作:

n_core = multiprocessing.cpu_count()
p = multiprocessing.Pool(processes=n_core)   
s = df['col1']
subset_results = []
for i in range(s.max() + 1):
subset = s[s == i]
subset_results.append(p.map(f, subset))

我宁愿引入一个块函数,即使对于你的情况它没有引入优势,使代码更加清晰和通用。

最新更新