我有一个非常大的熊猫数据框,我想在上面映射许多函数。 因为框架很大,我写了一些代码来并行化它:
import pandas as pd
import numpy as np
from multiprocessing import cpu_count(), Pool
my_frame = pd.DataFrame(...) # A large data frame with the column "data"
def parallel_map(series: pd.Series, func):
cores = cpu_count()
partitions = cores
data_split = np.array_split(series, partitions)
print(f"Parallelizing with {cores} cores...")
with Pool(cores) as pool:
data = pd.concat(pool.map(func, data_split))
pool.join()
return data
我想用pd.Series.map
来称呼它,即我想计算每一行的东西;像这样:
def transform_data(entry):
# Do expensive stuff
return entry
非并行,我现在可以做
my_frame["data"].map(transform_data)
但是,对于并行版本,我需要在全局命名空间中定义一个附加函数来反转调用者,因为Pool.map
f(x)
适用,但我想调用x.f()
。该函数需要能够由池运行:
def inverted_transform_data(column: pd.Series):
return column.map(transform_data)
现在我可以像这样调用并行版本:
parallel_map(data=my_frame["data"], func=inverted_transform_data)
问题是我想为许多需要按顺序处理的功能执行此操作,即transform_data1, transform_data2, ...
.这需要我为每个全局包装器函数创建。
有没有更好的选择仍然可以腌制?
Dask! https://dask.org/
Dask是一个专门针对平行熊猫的项目。我强烈建议您考虑将其用于您的用例。如果您只想在坚持使用熊猫的同时提高性能,请查看此处的文档:
https://pandas.pydata.org/pandas-docs/stable/enhancingperf.html
我发现这篇文章特别有帮助:
https://engineering.upside.com/a-beginners-guide-to-optimizing-pandas-code-for-speed-c09ef2c6a4d6
编辑:
使用 dask,您可以做到:
import dask.dataframe as dd
df = # import method such as dd.read_csv("df.csv")
df.apply(func, ...) # or dd.data_col.apply(func, ...)
df.compute()
我最终得到了一个"低预算"的解决方案,因为我不想引入 dask 作为依赖项。它只是创建一个可调用的包装类:
class InvertedCallerMap(object):
def __init__(self, func):
"""
Required so the parallel map can call x.f() instead of f(x) without running into pickling issues
:param func: Function to invert from x.f() to f(x)
"""
self.func = func
def __call__(self, column: pd.Series):
return column.map(self.func)
def parallel_map(series, func, invert=True):
cores = cpu_count()
partitions = cores
data_split = np.array_split(series, partitions)
if invert:
func = InvertedCallerMap(func=func)
with Pool(cores) as pool:
data = pd.concat(pool.map(func, data_split))
pool.join()
return data