如何使用UDF添加多个列



问题

我想将UDF的返回值添加到单独的列中的现有数据框架中。如何以足够的方式实现这一目标?

这是我到目前为止所拥有的示例。

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType  
df = spark.createDataFrame([("Alive",4)],["Name","Number"])
df.show(1)
+-----+------+
| Name|Number|
+-----+------+
|Alive|     4|
+-----+------+
def example(n):
        return [[n+2], [n-2]]
#  schema = StructType([
#          StructField("Out1", ArrayType(IntegerType()), False),
#          StructField("Out2", ArrayType(IntegerType()), False)])
example_udf = udf(example)

现在,我可以在数据框中添加一列,如下所示

newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF.show(1)
+-----+------+----------+
| Name|Number|Output    |
+-----+------+----------+
|Alive|     4|[[6], [2]]|
+-----+------+----------+

但是,我不希望两个值在同一列中,而是在单独的列中。

理想情况下,我想立即拆分输出列,以避免在此处和此处解释两次(每次返回值一次)调用示例函数(一次),但是在我的情况下,我会得到一系列数组't看到一个分裂在那里的工作方式(请注意,每个数组都包含多个值,以","。

分开

结果应该看起来像

我最终想要的是这个

+-----+------+----+----+
| Name|Number|Out1|Out2|
+-----+------+----+----+
|Alive|     4|   6|   2|
+-----+------+----+----+

请注意,结构类型返回类型的使用是可选的,不一定是解决方案的一部分。

编辑:我评论了structType的使用(并编辑了UDF分配),因为示例函数的返回类型不是必需的。但是,如果返回值像

,则必须使用它
return [6,3,2],[4,3,1]

返回 StructType,只需使用 Row

from pyspark.sql.types import StructType,StructField,IntegerType,Row
from pyspark.sql import functions as F
df = spark.createDataFrame([("Alive", 4)], ["Name", "Number"])

def example(n):
    return Row('Out1', 'Out2')(n + 2, n - 2)

schema = StructType([
    StructField("Out1", IntegerType(), False),
    StructField("Out2", IntegerType(), False)])
example_udf = F.UserDefinedFunction(example, schema)
newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF = newDF.select("Name", "Number", "Output.*")
newDF.show(truncate=False)

更好地解决上述问题的方法是将输出施放在数组中,然后爆炸

import pyspark.sql.functions as f
import pyspark.sql.types as t
df = spark.createDataFrame([("Alive", 4)], ["Name", "Number"])

def example(n):
    return t.Row('Out1', 'Out2')(n + 2, n - 2)

schema = StructType([
    StructField("Out1", t.IntegerType(), False),
    StructField("Out2", t.IntegerType(), False)])
example_udf = f.udf(example, schema)
newDF = df.withColumn("Output", f.explode(f.array(example_udf(df["Number"]))))
newDF = newDF.select("Name", "Number", "Output.*")
newDF.show(truncate=False)
newDF.explain()

注意解释的输出,您将观察到该示例方法实际上只被调用一次!

在scala

import spark.implicits
val df = Seq(("Alive", 4)).toDF("Name", "Number")

没有UDF

df.
  withColumn("OutPlus",  $"Number" + 2).
  withColumn("OutMinus", $"Number" - 2).
  show
+-----+------+-------+--------+
| Name|Number|OutPlus|OutMinus|
+-----+------+-------+--------+
|Alive|     4|      6|       2|
+-----+------+-------+--------+

使用UDF使用爆炸

import org.apache.spark.sql.functions.udf
def twoItems(_i: Int) = Seq((_i + 2, _i - 2))
val twoItemsUdf = udf(twoItems(_: Int))
val exploded = df.
  withColumn("Out", explode(twoItemsUdf($"Number"))).
  withColumn("OutPlus", $"Out._1").
  withColumn("OutMinus", $"Out._2")
exploded.printSchema
root
 |-- Name: string (nullable = true)
 |-- Number: integer (nullable = false)
 |-- Out: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: integer (nullable = false)
 |-- OutPlus: integer (nullable = true)
 |-- OutMinus: integer (nullable = true)
  exploded.drop("Out").show
+-----+------+-------+--------+
| Name|Number|OutPlus|OutMinus|
+-----+------+-------+--------+
|Alive|     4|      6|       2|
+-----+------+-------+--------+

相关内容

  • 没有找到相关文章

最新更新