我有一个DF与一个巨大的可解析元数据作为一个单一的字符串列在一个Dataframe,我们称之为DFA,与collna。
我想通过函数ClassXYZ = Func1(collna)将此列,collna分解为多个列。这个函数返回一个类ClassXYZ,带有多个变量,并且这些变量中的每一个现在都必须映射到新的Column,例如ColmnA1、ColmnA2等。
我如何通过只调用一次Func1来实现从一个数据帧到另一个数据帧的转换,而不必重复调用它来创建所有的列呢?
如果我每次添加一个新列都调用这个大函数,这很容易解决,但我希望避免。
请告知工作代码或伪代码。
感谢桑杰
一般来说,你想要的是不可能直接实现的。UDF每次只能返回一列。有两种不同的方法可以克服此限制:
-
返回复杂类型的列。最一般的解决方案是
StructType
,但您也可以考虑ArrayType
或MapType
。import org.apache.spark.sql.functions.udf val df = Seq( (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c") ).toDF("x", "y", "z") case class Foobar(foo: Double, bar: Double) val foobarUdf = udf((x: Long, y: Double, z: String) => Foobar(x * y, z.head.toInt * y)) val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z")) df1.show // +---+----+---+------------+ // | x| y| z| foobar| // +---+----+---+------------+ // | 1| 3.0| a| [3.0,291.0]| // | 2|-1.0| b|[-2.0,-98.0]| // | 3| 0.0| c| [0.0,0.0]| // +---+----+---+------------+ df1.printSchema // root // |-- x: long (nullable = false) // |-- y: double (nullable = false) // |-- z: string (nullable = true) // |-- foobar: struct (nullable = true) // | |-- foo: double (nullable = false) // | |-- bar: double (nullable = false)
稍后可以很容易地平化,但通常不需要这样做。
-
切换到RDD,重塑和重建DF:
import org.apache.spark.sql.types._ import org.apache.spark.sql.Row def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = Seq(x * y, z.head.toInt * y) val schema = StructType(df.schema.fields ++ Array(StructField("foo", DoubleType), StructField("bar", DoubleType))) val rows = df.rdd.map(r => Row.fromSeq( r.toSeq ++ foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z")))) val df2 = sqlContext.createDataFrame(rows, schema) df2.show // +---+----+---+----+-----+ // | x| y| z| foo| bar| // +---+----+---+----+-----+ // | 1| 3.0| a| 3.0|291.0| // | 2|-1.0| b|-2.0|-98.0| // | 3| 0.0| c| 0.0| 0.0| // +---+----+---+----+-----+
假设函数后面有一个元素序列,给出如下示例:
val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age")
df.show
+------------------+---+
| infoComb|age|
+------------------+---+
|Mike,1986,Toronto| 30|
| Andre,1980,Ottawa| 36|
| jill,1989,London| 27|
+------------------+---+
现在你可以对这个infoComb做的是,你可以开始拆分字符串,得到更多的列:
df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show
+-----+----------+-------+---+
| name|yearOfBorn| city|age|
+-----+----------+-------+---+
|Mike| 1986|Toronto| 30|
|Andre| 1980| Ottawa| 36|
| jill| 1989| London| 27|
+-----+----------+-------+---+
希望这对你有帮助。
如果生成的列与原始列的长度相同,则可以使用withColumn函数并通过应用udf创建全新的列。在此之后,您可以删除原始列,例如:
val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn")))
.withColumn("newCol2", myFun2(myDf("originalColumn"))
.drop(myDf("originalColumn"))
其中myFun是一个udf,定义如下:
def myFun= udf(
(originalColumnContent : String) => {
// do something with your original column content and return a new one
}
)
我选择创建一个函数来平化一列,然后与udf同时调用它。
首先定义这个:
implicit class DfOperations(df: DataFrame) {
def flattenColumn(col: String) = {
def addColumns(df: DataFrame, cols: Array[String]): DataFrame = {
if (cols.isEmpty) df
else addColumns(
df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)),
cols.tail
)
}
val field = df.select(col).schema.fields(0)
val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name)
addColumns(df, newCols).drop(col)
}
def withColumnMany(colName: String, col: Column) = {
df.withColumn(colName, col).flattenColumn(colName)
}
}
用法很简单:
case class MyClass(a: Int, b: Int)
val df = sc.parallelize(Seq(
(0),
(1)
)).toDF("x")
val f = udf((x: Int) => MyClass(x*2,x*3))
df.withColumnMany("test", f($"x")).show()
// +---+------+------+
// | x|test_a|test_b|
// +---+------+------+
// | 0| 0| 0|
// | 1| 2| 3|
// +---+------+------+
这可以通过使用pivot函数轻松实现
df4.groupBy("year").pivot("course").sum("earnings").collect()