除了一种情况外,我很喜欢 Polars 比熊猫显着加速。我是 Polars 的新手,所以这可能只是我的错误用法。无论如何,这是玩具示例: 在单列上,我需要应用自定义函数,在这种情况下它是从probablypeople
库 (https://github.com/datamade/probablepeople)parse
的,但问题是通用的。
普通熊猫apply
具有与 Polars 类似的运行时间,但具有 (https://github.com/nalepae/pandarallel)parallel_apply
的熊猫的加速与核心数成正比。
在我看来,Polars 只使用单个内核进行自定义功能,还是我错过了什么?
如果我正确使用Polars,也许有可能为Polars创建像pandaralell
这样的工具?
!pip install probablepeople
!pip install pandarallel
import pandas as pd
import probablepeople as pp
import polars as pl
from pandarallel import pandarallel
AMOUNT = 1000_000
#Pandas:
df = pd.DataFrame({'a': ["Mr. Joe Smith"]})
df = df.loc[df.index.repeat(AMOUNT)].reset_index(drop=True)
df['b'] = df['a'].apply(pp.parse)
#Pandarallel:
pandarallel.initialize(progress_bar=True)
df['b_multi'] = df['a'].parallel_apply(pp.parse)
#Polars:
dfp = pl.DataFrame({'a': ["Mr. Joe Smith"]})
dfp = dfp.select(pl.all().repeat_by(AMOUNT).explode())
dfp = dfp.with_columns(pl.col('a').apply(pp.parse).alias('b'))
pandarallel
使用多处理。
您还可以将多处理与极坐标一起使用。
这样的函数可能如下所示:
def parallel_apply(func, column, return_dtype=pl.Series, chunksize=128):
with multiprocessing.get_context("spawn").Pool() as pool:
results = pool.imap(func, track(column), chunksize)
return return_dtype(results)
rich.progress.track()
用于生成一个漂亮的进度条 - 它与pip
捆绑在一起。
对于这个特定的用例,pp.parse
返回一个元组列表,例如
[(Mr., PrefixMarital), (Joe, GivenName), ...
我们可以返回以下形式的字典,而不是返回pl.Series(results)
:
[{"PrefixMarital": ["Mr."], "GivenName": ["Joe"], ...}, ...
我们使用列表值,因为pp.parse
每个"键"可以生成多个值。
Polars 会将它们转换为structs
然后可以"取消嵌套">到实际列中 - 使用字典的键作为列名。
import multiprocessing
import polars as pl
import probablepeople as pp
from pip._vendor.rich.progress import track
def parallel_apply(func, iterable, return_dtype=pl.Series, chunksize=128):
with multiprocessing.get_context("spawn").Pool() as pool:
result = pool.imap(func, track(iterable), chunksize)
return return_dtype(result)
# Custom return_dtype for `pp.parse`
def return_dtype(result):
rows = []
for item in result:
row = {}
for value, category in item:
row.setdefault(category, []).append(value)
rows.append(row)
"""Create empty dict from all keys seen
Merge with each row so they all have the same keys
This fills empty "columns" with `null`"""
empty = dict.fromkeys(set().union(*rows))
return pl.Series(empty | row for row in rows)
if __name__ == "__main__":
AMOUNT = 1_000_000
df = pl.DataFrame({"a": ["Mr. Joe Smith"]})
df = df.select(pl.all().repeat_by(AMOUNT).explode())
df = df.with_column(
pl.col("a").map(lambda col: parallel_apply(pp.parse, col, return_dtype))
.alias("b")
)
print(df)
print(df.unnest("b"))
在.unnest()
之前
┌───────────────────────┬─────────────────────────────────────┐
│ a | b │
│ --- | --- │
│ str | struct[4] │
╞═══════════════════════╪═════════════════════════════════════╡
│ Mr. Joe Smith | {["Joe"],["Mr."],null,["Smith"]} │
├───────────────────────┼─────────────────────────────────────┤
│ Mrs. I & II Bob Hello | {["Bob"],["Mrs."],["I", "&", "II... │
└───────────────────────┴─────────────────────────────────────┘
.unnest()
后
shape: (2, 5)
┌───────────────────────┬───────────┬───────────────┬─────────────────────────────────┬───────────┐
│ a | GivenName | PrefixMarital | CorporationNameBranchIdentifier | Surname │
│ --- | --- | --- | --- | --- │
│ str | list[str] | list[str] | list[str] | list[str] │
╞═══════════════════════╪═══════════╪═══════════════╪═════════════════════════════════╪═══════════╡
│ Mr. Joe Smith | ["Joe"] | ["Mr."] | null | ["Smith"] │
├───────────────────────┼───────────┼───────────────┼─────────────────────────────────┼───────────┤
│ Mrs. I & II Bob Hello | ["Bob"] | ["Mrs."] | ["I", "&", "II"] | ["Hello"] │
└───────────────────────┴───────────┴───────────────┴─────────────────────────────────┴───────────┘
然后,您可以根据需要进行进一步处理.explode(pl.exclude("a"))
。
性能:
使用您的1_000_000
示例 - 在我的机器上,运行时是:
多处理 | 持续时间 |
---|---|
是 | 1分23秒 |
没有 | 5m2s |