pyspark中带有dict查找的矢量化Panda udf



我正在努力学习在pyspark(Databricks(中使用pandas_udf

其中一项任务是编写一个pandas_udf,按一周中的哪一天进行排序。我知道如何使用spark-udf:做到这一点

from pyspark.sql.functions import *
data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)
print('Original')
df.show()

@udf()
def udf(day: str) -> str:
dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
"Fri": "5", "Sat": "6", "Sun": "7"}
return dow[day] + '-' + day
print('with spark udf')
final_df = df.select(col('avg_users'), udf(col('day')).alias('day')).sort('day')
final_df.show()

打印:

Original
+---+-----------+
|day|  avg_users|
+---+-----------+
|Sun|   282905.5|
|Mon|   238195.5|
|Thu|   264620.0|
|Sat|   278482.0|
|Wed|   227214.0|
+---+-----------+
with spark udf
+-----------+-----+
|  avg_users|  day|
+-----------+-----+
|   238195.5|1-Mon|
|   227214.0|3-Wed|
|   264620.0|4-Thu|
|   278482.0|6-Sat|
|   282905.5|7-Sun|
+-----------+-----+

尝试用pandas_udf做同样的事情

import pandas as pd

@pandas_udf('string')
def p_udf(day: pd.Series) -> pd.Series:
dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
"Fri": "5", "Sat": "6", "Sun": "7"}
return dow[day.str] + '-' + day.str

p_final_df = df.select(df.avg_users, p_udf(df.day))
print('with pandas udf')
p_final_df.show()

我得到KeyError: <pandas.core.strings.accessor.StringMethods object at 0x7f31197cd9a0>。我认为它来自dow[day.str],这有点道理。

我也试过:

return dow[day.str.__str__()] + '-' + day.str # KeyError: .... StringMethods
return dow[str(day.str)] + '-' + day.str      # KeyError: .... StringMethods
return dow[day.str.upper()] + '-' + day.str   # TypeError: unhashable type: 'Series'
return f"{dow[day.str]}-{day.str}"            # KeyError: .... StringMethods (but I think this is logically
# wrong, returning a string instead of a Series)

我读过:

  • API参考
  • Pandas UDF中lambda函数的PySpark等价
  • 如何将Scalar Pyspark UDF转换为Pandas UDF
  • pyspark中的Pandas UDF

在没有任何实际矢量化变换的情况下单独使用.str方法会导致错误。此外,您不能将整个系列用作dowdict的密钥。使用map方法进行pandas.Series:

from pyspark.sql.functions import *
import pandas as pd
data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)
@pandas_udf("string")
def p_udf(day: pd.Series) -> pd.Series:
dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
"Fri": "5", "Sat": "6", "Sun": "7"}
return day.map(dow) + '-' + day
df.select(df.avg_users, p_udf(df.day).alias("day")).show()
+---------+-----+
|avg_users|  day|
+---------+-----+
| 282905.5|7-Sun|
| 238195.5|1-Mon|
| 264620.0|4-Thu|
| 278482.0|6-Sat|
| 227214.0|3-Wed|
+---------+-----+

我们在执行udf后使用groupeddata和orderby返回一个数据帧怎么样。Pandassort_values在udfs中是相当有问题的。

基本上,在udf中,我使用python生成数字,然后将它们连接回day列。

from pyspark.sql.functions import pandas_udf
import pandas as pd
from pyspark.sql.types import *
import calendar
def sortdf(pdf):
day=pdf.day
pdf =pdf.assign(day=(day.map(dict(zip(calendar.day_abbr, range(7))))+1).astype(str) + '-'+day)

return pdf
df.groupby('avg_users').applyInPandas(sortdf, schema=df.schema).show()
+-----+---------+
|  day|avg_users|
+-----+---------+
|3-Wed| 227214.0|
|1-Mon| 238195.5|
|4-Thu| 264620.0|
|6-Sat| 278482.0|
|7-Sun| 282905.5|
+-----+---------+

最新更新