并行映射 x.f() 而不是许多函数的 f(x)



我有一个非常大的熊猫数据框,我想在上面映射许多函数。 因为框架很大,我写了一些代码来并行化它:

import pandas as pd
import numpy as np
from multiprocessing import cpu_count(), Pool
my_frame = pd.DataFrame(...) # A large data frame with the column "data"
def parallel_map(series: pd.Series, func):
cores = cpu_count()
partitions = cores
data_split = np.array_split(series, partitions)
print(f"Parallelizing with {cores} cores...")
with Pool(cores) as pool:
data = pd.concat(pool.map(func, data_split))
pool.join()
return data

我想用pd.Series.map来称呼它,即我想计算每一行的东西;像这样:

def transform_data(entry):
# Do expensive stuff
return entry

非并行,我现在可以做

my_frame["data"].map(transform_data)

但是,对于并行版本,我需要在全局命名空间中定义一个附加函数来反转调用者,因为Pool.mapf(x)适用,但我想调用x.f()。该函数需要能够由池运行:

def inverted_transform_data(column: pd.Series):
return column.map(transform_data)

现在我可以像这样调用并行版本:

parallel_map(data=my_frame["data"], func=inverted_transform_data)

问题是我想为许多需要按顺序处理的功能执行此操作,即transform_data1, transform_data2, ....这需要我为每个全局包装器函数创建。

有没有更好的选择仍然可以腌制?

Dask! https://dask.org/

Dask是一个专门针对平行熊猫的项目。我强烈建议您考虑将其用于您的用例。如果您只想在坚持使用熊猫的同时提高性能,请查看此处的文档:

https://pandas.pydata.org/pandas-docs/stable/enhancingperf.html

我发现这篇文章特别有帮助:

https://engineering.upside.com/a-beginners-guide-to-optimizing-pandas-code-for-speed-c09ef2c6a4d6

编辑:

使用 dask,您可以做到:

import dask.dataframe as dd
df = # import method such as dd.read_csv("df.csv")
df.apply(func, ...) # or dd.data_col.apply(func, ...)
df.compute()

我最终得到了一个"低预算"的解决方案,因为我不想引入 dask 作为依赖项。它只是创建一个可调用的包装类:

class InvertedCallerMap(object):
def __init__(self, func):
"""
Required so the parallel map can call x.f() instead of f(x) without running into pickling issues
:param func: Function to invert from x.f() to f(x)
"""
self.func = func
def __call__(self, column: pd.Series):
return column.map(self.func)

def parallel_map(series, func, invert=True):
cores = cpu_count()
partitions = cores
data_split = np.array_split(series, partitions)
if invert:
func = InvertedCallerMap(func=func)
with Pool(cores) as pool:
data = pd.concat(pool.map(func, data_split))
pool.join()
return data

最新更新