如何对数据帧执行多进程操作



我必须在数据帧上执行大量操作,使用单个内核需要很长时间。我正在尝试实现多处理。

现在,当我试图弄清楚它是如何工作的时,所以我使用的是更简单的版本,我只想从数据中添加值

import multiprocessing
import pandas as pd
def add_values(a):
    df = pd.DataFrame([{'n':a}])
    return df
df = pd.DataFrame([{'n':0}])
data = [9, 4, 5]
with multiprocessing.Pool(processes=4) as pool:
    df = df.add(pool.map(add_values, data))
df

我希望 df 返回一个 n=18 的数据帧,但我收到此错误消息 值错误:无法强制到系列,长度必须为 1:给定 3

这里的问题是如何处理多处理调用的返回值。 pool.map()返回一个list。在这种特殊情况下,它将是一个数据帧列表,即您的调用扩展到什么等同于 df = df.add([dfn9, dfn4, dfn5]) ,其中dfnX是不同的数据帧。

这个输入既不是预期的,也不是由df.add()处理的,它期望一些东西可以变成一个pd.Series的对象并添加到原始帧中。相反,您需要获取此列表并"手动"减少它,例如:

import multiprocessing
import pandas as pd
def add_values(a):
    df = pd.DataFrame([{'n':a}])
    return df
df = pd.DataFrame([{'n':0}])
data = [9, 4, 5]
with multiprocessing.Pool(processes=4) as pool:
    #df = df.add(pool.map(add_values, data)) does not work
    dfs = pool.map(add_values, data)
print(type(dfs))
# Reducing return values
for d in dfs:
    df = df.add(d)
print(df)

缩减必须在单个进程中进行,因为不同的进程不共享相同的df(相反,它们都具有相同的副本(。

作为旁注,我认为您还应该考虑使用multithreading rahter 而不是 multiprocessing .它可能更简单,因为线程可以共享相同的内存并减少复制内存的需求。此外,由于pandas减少了 GIL,因此不存在一次只能执行一个线程的问题。

相关内容

  • 没有找到相关文章

最新更新