我想要一个正确的udf并应用于数据帧
创建Spark df:
df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
熊猫功能:
@udf("integer")
def add_con(x : pd.Series):
if x>5:
return x*x
else:
return x
df.printSchema()
df.withColumn('new', add_con(df.v)).show()
输出(请更正udf(:
root
|-- id: long (nullable = true)
|-- v: double (nullable = true)
+---+----+----+
| id| v| new|
+---+----+----+
| 1| 1.0|null|
| 1| 2.0|null|
| 2| 3.0|null|
| 2| 5.0|null|
| 2|10.0|null|
+---+----+----+
这起到了作用:
from pyspark.sql import functions as f
df.withColumn('new', f.when(df.v > 5, df.v * df.v).otherwise(df.v)).show()
# +---+----+-----+
# | id| v| new|
# +---+----+-----+
# | 1| 1.0| 1.0|
# | 1| 2.0| 2.0|
# | 2| 3.0| 3.0|
# | 2| 5.0| 5.0|
# | 2|10.0|100.0|
# +---+----+-----+
您正在传递浮点字段;但返回整数类型。此外,参数类型pd.Series
也不是必需的。
给你:
df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)]).toDF(*["id", "v"])
@udf("float")
def add_con(x):
if x>5:
return x*x
else:
return x
#
df.withColumn('new', add_con(df.v)).show()
这将是工作的pandas_udf
:
from pyspark.sql import functions as F
@F.pandas_udf("integer")
def add_con(x: pd.Series) -> pd.Series:
return pd.Series([e*e if e>5 else e for e in x])
df.withColumn('new', add_con(df.v)).show()
# +---+----+---+
# | id| v|new|
# +---+----+---+
# | 1| 1.0| 1|
# | 1| 2.0| 2|
# | 2| 3.0| 3|
# | 2| 5.0| 5|
# | 2|10.0|100|
# +---+----+---+
对您来说,它不起作用,因为您想直接对pd.Series
对象进行操作。如果您使用pd.Series
的元素,然后将结果转换回pd.Series
,则它会起作用。