是否有办法像这样转换org.apache.spark.sql.DataFrame
Predictor icaoCode num1 num2
P1 OTHH 1.1 1.2
P1 ZGGG 2.1 2.2
P2 OTHH 3.1 3.2
P2 ZGGG 4.1 4.2
P3 OTHH 5.1 5.2
P3 ZGGG 6.1 6.2
. . . .
. . . .
. . . .
变成这样的DataFrame
?
icaoCode P1.num1 P1.num2 P2.num1 P2.num2 P3.num1 P3.num2 ...
OTHH 1.1 1.2 3.1 3.2 5.1 5.2 ...
ZGGG 2.1 2.2 4.1 4.2 6.1 6.2 ...
. . . . . . . ...
. . . . . . . ...
. . . . . . . ...
Predictor
和icaoCode
可以有任意数目的值
使用Spark 1.6.0,有一个pivot
函数来转换/转置您的数据。在您的情况下,它需要一些预处理来为pivot
准备数据。下面是我如何做的一个例子:
def doPivot(): Unit = {
val sqlContext: SQLContext = new org.apache.spark.sql.SQLContext(sc)
// dummy data
val r1 = Input("P1", "OTHH", 1.1, 1.2)
val r2 = Input("P1", "ZGGG", 2.1, 2.2)
val r3 = Input("P2", "OTHH", 3.1, 3.2)
val records = Seq(r1, r2, r3)
val df = sqlContext.createDataFrame(records)
// prepare data for pivot
val fullName: ((String, String) => String) = (predictor: String, num: String) => {
predictor + "." + num
}
val udfFullName = udf(fullName)
val dfFullName = df.withColumn("num1-complete", udfFullName(col("predictor"), lit("num1")))
.withColumn("num2-complete", udfFullName(col("predictor"), lit("num2")))
val dfPrepared = dfFullName.select(col("icaoCode"), col("num1") as "num", col("num1-complete") as "value")
.unionAll(dfFullName.select(col("icaoCode"), col("num2") as "num", col("num2-complete") as "value"))
// transpose/pivot dataframe
val dfPivoted = dfPrepared.groupBy(col("icaoCode")).pivot("value").mean("num")
dfPivoted.show()
}
case class Input(predictor: String, icaoCode: String, num1: Double, num2: Double)
最终的数据框架应该为你工作:
+--------+-------+-------+-------+-------+
|icaoCode|P1.num1|P1.num2|P2.num1|P2.num2|
+--------+-------+-------+-------+-------+
| OTHH| 1.1| 1.2| 3.1| 3.2|
| ZGGG| 2.1| 2.2| null| null|
+--------+-------+-------+-------+-------+