我必须在数据帧上执行大量操作,使用单个内核需要很长时间。我正在尝试实现多处理。
现在,当我试图弄清楚它是如何工作的时,所以我使用的是更简单的版本,我只想从数据中添加值
import multiprocessing
import pandas as pd
def add_values(a):
df = pd.DataFrame([{'n':a}])
return df
df = pd.DataFrame([{'n':0}])
data = [9, 4, 5]
with multiprocessing.Pool(processes=4) as pool:
df = df.add(pool.map(add_values, data))
df
我希望 df 返回一个 n=18 的数据帧,但我收到此错误消息 值错误:无法强制到系列,长度必须为 1:给定 3
这里的问题是如何处理多处理调用的返回值。 pool.map()
返回一个list
。在这种特殊情况下,它将是一个数据帧列表,即您的调用扩展到什么等同于 df = df.add([dfn9, dfn4, dfn5])
,其中dfnX
是不同的数据帧。
这个输入既不是预期的,也不是由df.add()
处理的,它期望一些东西可以变成一个pd.Series
的对象并添加到原始帧中。相反,您需要获取此列表并"手动"减少它,例如:
import multiprocessing
import pandas as pd
def add_values(a):
df = pd.DataFrame([{'n':a}])
return df
df = pd.DataFrame([{'n':0}])
data = [9, 4, 5]
with multiprocessing.Pool(processes=4) as pool:
#df = df.add(pool.map(add_values, data)) does not work
dfs = pool.map(add_values, data)
print(type(dfs))
# Reducing return values
for d in dfs:
df = df.add(d)
print(df)
缩减必须在单个进程中进行,因为不同的进程不共享相同的df
(相反,它们都具有相同的副本(。
作为旁注,我认为您还应该考虑使用multithreading
rahter 而不是 multiprocessing
.它可能更简单,因为线程可以共享相同的内存并减少复制内存的需求。此外,由于pandas
减少了 GIL,因此不存在一次只能执行一个线程的问题。