极坐标将性能应用于自定义函数



除了一种情况外,我很喜欢 Polars 比熊猫显着加速。我是 Polars 的新手,所以这可能只是我的错误用法。无论如何,这是玩具示例: 在单列上,我需要应用自定义函数,在这种情况下它是从probablypeople库 (https://github.com/datamade/probablepeople)parse的,但问题是通用的。

普通熊猫apply具有与 Polars 类似的运行时间,但具有 (https://github.com/nalepae/pandarallel)parallel_apply的熊猫的加速与核心数成正比。

在我看来,Polars 只使用单个内核进行自定义功能,还是我错过了什么?

如果我正确使用Polars,也许有可能为Polars创建像pandaralell这样的工具?

!pip install probablepeople
!pip install pandarallel
import pandas as pd
import probablepeople as pp
import polars as pl
from pandarallel import pandarallel
AMOUNT = 1000_000
#Pandas:
df = pd.DataFrame({'a': ["Mr. Joe Smith"]})
df = df.loc[df.index.repeat(AMOUNT)].reset_index(drop=True)
df['b'] = df['a'].apply(pp.parse)
#Pandarallel:
pandarallel.initialize(progress_bar=True)
df['b_multi'] = df['a'].parallel_apply(pp.parse)
#Polars:
dfp = pl.DataFrame({'a': ["Mr. Joe Smith"]})
dfp = dfp.select(pl.all().repeat_by(AMOUNT).explode())
dfp = dfp.with_columns(pl.col('a').apply(pp.parse).alias('b'))

pandarallel使用多处理。

您还可以将多处理与极坐标一起使用。

这样的函数可能如下所示:

def parallel_apply(func, column, return_dtype=pl.Series, chunksize=128):
with multiprocessing.get_context("spawn").Pool() as pool:
results = pool.imap(func, track(column), chunksize)
return return_dtype(results)

rich.progress.track()用于生成一个漂亮的进度条 - 它与pip捆绑在一起。


对于这个特定的用例,pp.parse返回一个元组列表,例如

[(Mr., PrefixMarital), (Joe, GivenName), ...

我们可以返回以下形式的字典,而不是返回pl.Series(results)

[{"PrefixMarital": ["Mr."], "GivenName": ["Joe"], ...}, ...

我们使用列表值,因为pp.parse每个"键"可以生成多个值。

Polars 会将它们转换为structs然后可以"取消嵌套">到实际列中 - 使用字典的键作为列名。

import multiprocessing
import polars as pl
import probablepeople as pp
from pip._vendor.rich.progress import track
def parallel_apply(func, iterable, return_dtype=pl.Series, chunksize=128):
with multiprocessing.get_context("spawn").Pool() as pool:
result = pool.imap(func, track(iterable), chunksize)
return return_dtype(result)
# Custom return_dtype for `pp.parse`
def return_dtype(result):
rows = []
for item in result:
row = {}
for value, category in item:
row.setdefault(category, []).append(value)
rows.append(row)
"""Create empty dict from all keys seen
Merge with each row so they all have the same keys
This fills empty "columns" with `null`"""
empty = dict.fromkeys(set().union(*rows))
return pl.Series(empty | row for row in rows)
if __name__ == "__main__":
AMOUNT = 1_000_000
df = pl.DataFrame({"a": ["Mr. Joe Smith"]})
df = df.select(pl.all().repeat_by(AMOUNT).explode())
df = df.with_column(
pl.col("a").map(lambda col: parallel_apply(pp.parse, col, return_dtype))
.alias("b")
)
print(df)
print(df.unnest("b"))

.unnest()之前

┌───────────────────────┬─────────────────────────────────────┐
│ a                     | b                                   │
│ ---                   | ---                                 │
│ str                   | struct[4]                           │
╞═══════════════════════╪═════════════════════════════════════╡
│ Mr. Joe Smith         | {["Joe"],["Mr."],null,["Smith"]}    │
├───────────────────────┼─────────────────────────────────────┤
│ Mrs. I & II Bob Hello | {["Bob"],["Mrs."],["I", "&", "II... │
└───────────────────────┴─────────────────────────────────────┘

.unnest()

shape: (2, 5)
┌───────────────────────┬───────────┬───────────────┬─────────────────────────────────┬───────────┐
│ a                     | GivenName | PrefixMarital | CorporationNameBranchIdentifier | Surname   │
│ ---                   | ---       | ---           | ---                             | ---       │
│ str                   | list[str] | list[str]     | list[str]                       | list[str] │
╞═══════════════════════╪═══════════╪═══════════════╪═════════════════════════════════╪═══════════╡
│ Mr. Joe Smith         | ["Joe"]   | ["Mr."]       | null                            | ["Smith"] │
├───────────────────────┼───────────┼───────────────┼─────────────────────────────────┼───────────┤
│ Mrs. I & II Bob Hello | ["Bob"]   | ["Mrs."]      | ["I", "&", "II"]                | ["Hello"] │
└───────────────────────┴───────────┴───────────────┴─────────────────────────────────┴───────────┘

然后,您可以根据需要进行进一步处理.explode(pl.exclude("a"))

性能:

使用您的1_000_000示例 - 在我的机器上,运行时是:

多处理持续时间
1分23秒
没有5m2s

相关内容

  • 没有找到相关文章

最新更新