我正在尝试将一些输入转换为火花数据帧中我想要的格式。我拥有的输入是这个案例类的序列,最多有 10,000,000 个类(或者可能是我将其转换为案例类之前的 Json 字符串......
case class Element(paramName: String, value: Int, time: Int)
因此,我想要这样的数据帧:
|Time | ParamA | ParamB | ParamC | Param 10,000 |
|1000 | 432432 | 8768768 | Null....... | 75675678622 |
|2000 | Null.......| Null.........| 734543 | Null................. |
....
因此,并非必须为所有时隙定义每个参数。缺失值应填充 Null。并且可能会有 10,000 个参数和大约 1000 个时隙。
我现在这样做的方式从效率来看似乎非常糟糕:
case class Elements(name: String, value: Int, time: Int)
case class GroupedObjects(time: Int, params: (String, Int)*)
//elements contains the seq of Element
val elementsRdd: RDD[Elements] = sc.parallelize(elements)
val groupedRDD: RDD[GroupedObjects] = elementsRdd
.groupBy(element => element.time)
.map(tuple => GroupedObjects(tuple._1, tuple._2.map(element =>
(element.name, element.value)).toSeq: _*))
//transforming back to json string to get right format for RDD
val jsonRDD: RDD[String] = groupedRDD.map { obj =>
"{"time":" + obj.time + obj.params.map(tuple =>
","" + tuple._1 + "":" + tuple._2).reduce(_ + _) + "}"
}
val df = sqlContext.read.json(jsonRDD).orderBy("time")
df.show(10)
我在这里看到的问题肯定是更改回字符串,只是以正确的格式再次读取它。我真的很高兴能帮助我展示如何以所需的数据帧格式获取输入案例类。
以我现在的做法,它真的很慢,我得到了 10,000,000 个输入行的堆大小异常。
您可以尝试构建 Row 对象并手动定义 RDD 模式,类似于以下示例:
// These extra imports will be required if you don't have them already
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
//elements contains the seq of Element
val elementsRdd = sc.parallelize(elements)
val columnNames = elementsRdd.map(_.name).distinct().collect().sorted
val pivoted = elementsRdd.groupBy(_.time).map {
case (time, elemsByTime) =>
val valuesByColumnName = elemsByTime.groupBy(_.name).map {
case (name, elemsByTimeAndName) => (name, elemsByTimeAndName.map(_.value).sum)
}
val allValuesForRow = columnNames.map(valuesByColumnName.getOrElse(_, null))
(time, allValuesForRow)
}
val schema = StructType(StructField("Time", IntegerType) :: columnNames.map(columnName => StructField(columnName, IntegerType, nullable = true)).toList)
val rowRDD = pivoted.map(p => Row.fromSeq(p._1 :: p._2.toList))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.show(10)
我在本地尝试了 10,000,000 个元素,如下所示:
val elements = (1 to 10000000).map(i => Element("Param" + (i % 1000).toString, i + 100, i % 10000))
并在合理的时间内成功完成。
从Spark 1.6开始,有一个pivot
函数。它适用于数据帧。由于您使用的是案例类,因此这就像:
val elementsRdd: RDD[Elements] = sc.parallelize(elements)
val elementsDF = elementsRdd.toDF()
然后,您可以执行以下操作:
elementsDF.groupBy($"time").pivot(...)
有关pivot()
的更多信息,请参阅GroupedData
文档,但这应该足以让您继续。