问题
我想将UDF的返回值添加到单独的列中的现有数据框架中。如何以足够的方式实现这一目标?
这是我到目前为止所拥有的示例。
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
df = spark.createDataFrame([("Alive",4)],["Name","Number"])
df.show(1)
+-----+------+
| Name|Number|
+-----+------+
|Alive| 4|
+-----+------+
def example(n):
return [[n+2], [n-2]]
# schema = StructType([
# StructField("Out1", ArrayType(IntegerType()), False),
# StructField("Out2", ArrayType(IntegerType()), False)])
example_udf = udf(example)
现在,我可以在数据框中添加一列,如下所示
newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF.show(1)
+-----+------+----------+
| Name|Number|Output |
+-----+------+----------+
|Alive| 4|[[6], [2]]|
+-----+------+----------+
但是,我不希望两个值在同一列中,而是在单独的列中。
理想情况下,我想立即拆分输出列,以避免在此处和此处解释两次(每次返回值一次)调用示例函数(一次),但是在我的情况下,我会得到一系列数组't看到一个分裂在那里的工作方式(请注意,每个数组都包含多个值,以","。
分开结果应该看起来像
我最终想要的是这个
+-----+------+----+----+
| Name|Number|Out1|Out2|
+-----+------+----+----+
|Alive| 4| 6| 2|
+-----+------+----+----+
请注意,结构类型返回类型的使用是可选的,不一定是解决方案的一部分。
编辑:我评论了structType的使用(并编辑了UDF分配),因为示例函数的返回类型不是必需的。但是,如果返回值像
,则必须使用它return [6,3,2],[4,3,1]
返回 StructType
,只需使用 Row
from pyspark.sql.types import StructType,StructField,IntegerType,Row
from pyspark.sql import functions as F
df = spark.createDataFrame([("Alive", 4)], ["Name", "Number"])
def example(n):
return Row('Out1', 'Out2')(n + 2, n - 2)
schema = StructType([
StructField("Out1", IntegerType(), False),
StructField("Out2", IntegerType(), False)])
example_udf = F.UserDefinedFunction(example, schema)
newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF = newDF.select("Name", "Number", "Output.*")
newDF.show(truncate=False)
更好地解决上述问题的方法是将输出施放在数组中,然后爆炸
import pyspark.sql.functions as f
import pyspark.sql.types as t
df = spark.createDataFrame([("Alive", 4)], ["Name", "Number"])
def example(n):
return t.Row('Out1', 'Out2')(n + 2, n - 2)
schema = StructType([
StructField("Out1", t.IntegerType(), False),
StructField("Out2", t.IntegerType(), False)])
example_udf = f.udf(example, schema)
newDF = df.withColumn("Output", f.explode(f.array(example_udf(df["Number"]))))
newDF = newDF.select("Name", "Number", "Output.*")
newDF.show(truncate=False)
newDF.explain()
注意解释的输出,您将观察到该示例方法实际上只被调用一次!
在scala
中import spark.implicits
val df = Seq(("Alive", 4)).toDF("Name", "Number")
没有UDF
df.
withColumn("OutPlus", $"Number" + 2).
withColumn("OutMinus", $"Number" - 2).
show
+-----+------+-------+--------+
| Name|Number|OutPlus|OutMinus|
+-----+------+-------+--------+
|Alive| 4| 6| 2|
+-----+------+-------+--------+
使用UDF使用爆炸
import org.apache.spark.sql.functions.udf
def twoItems(_i: Int) = Seq((_i + 2, _i - 2))
val twoItemsUdf = udf(twoItems(_: Int))
val exploded = df.
withColumn("Out", explode(twoItemsUdf($"Number"))).
withColumn("OutPlus", $"Out._1").
withColumn("OutMinus", $"Out._2")
exploded.printSchema
root
|-- Name: string (nullable = true)
|-- Number: integer (nullable = false)
|-- Out: struct (nullable = true)
| |-- _1: integer (nullable = false)
| |-- _2: integer (nullable = false)
|-- OutPlus: integer (nullable = true)
|-- OutMinus: integer (nullable = true)
exploded.drop("Out").show
+-----+------+-------+--------+
| Name|Number|OutPlus|OutMinus|
+-----+------+-------+--------+
|Alive| 4| 6| 2|
+-----+------+-------+--------+