如何将if else udf pandas应用于列上的pyspark数据帧



我想要一个正确的udf并应用于数据帧

创建Spark df:

df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

熊猫功能:

@udf("integer")
def add_con(x : pd.Series):
if x>5:
return x*x
else:
return x
df.printSchema()
df.withColumn('new', add_con(df.v)).show()

输出(请更正udf(

root
|-- id: long (nullable = true)
|-- v: double (nullable = true)
+---+----+----+
| id|   v| new|
+---+----+----+
|  1| 1.0|null|
|  1| 2.0|null|
|  2| 3.0|null|
|  2| 5.0|null|
|  2|10.0|null|
+---+----+----+

这起到了作用:

from pyspark.sql import functions as f
df.withColumn('new', f.when(df.v > 5, df.v * df.v).otherwise(df.v)).show()
# +---+----+-----+
# | id|   v|  new|
# +---+----+-----+
# |  1| 1.0|  1.0|
# |  1| 2.0|  2.0|
# |  2| 3.0|  3.0|
# |  2| 5.0|  5.0|
# |  2|10.0|100.0|
# +---+----+-----+

您正在传递浮点字段;但返回整数类型。此外,参数类型pd.Series也不是必需的。

给你:

df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)]).toDF(*["id", "v"])
@udf("float")
def add_con(x):
if x>5:
return x*x
else:
return x
# 
df.withColumn('new', add_con(df.v)).show()

这将是工作的pandas_udf:

from pyspark.sql import functions as F
@F.pandas_udf("integer")
def add_con(x: pd.Series) -> pd.Series:
return pd.Series([e*e if e>5 else e for e in x])
df.withColumn('new', add_con(df.v)).show()
# +---+----+---+
# | id|   v|new|
# +---+----+---+
# |  1| 1.0|  1|
# |  1| 2.0|  2|
# |  2| 3.0|  3|
# |  2| 5.0|  5|
# |  2|10.0|100|
# +---+----+---+

对您来说,它不起作用,因为您想直接对pd.Series对象进行操作。如果您使用pd.Series的元素,然后将结果转换回pd.Series,则它会起作用。

最新更新