我正在使用Spark 1.6.0,我有输入映射RDD(键,值(对,并想转换为数据帧。
输入格式 RDD:
((1, A, ABC), List(pz,A1))
((2, B, PQR), List(az,B1))
((3, C, MNR), List(cs,c1))
输出格式:
+----+----+-----+----+----+
| c1 | c2 | c3 | c4 | c5 |
+----+----+-----+----+----+
| 1 | A | ABC | pz | A1 |
+----+----+-----+----+----+
| 2 | B | PQR | az | B1 |
+----+----+-----+----+----+
| 3 | C | MNR | cs | C1 |
+----+----+-----+----+----+
有人可以帮助我吗?
我建议您使用datasets
,因为datasets
已经过优化并且类型安全dataframes
。
首先,您需要创建一个case class
为
case class table(c1: Int, c2: String, c3: String, c4:String, c5:String)
然后,您只需要一个map
函数将数据解析为case class
并调用.toDS
rdd.map(x => table(x._1._1, x._1._2, x._1._3, x._2(0), x._2(1))).toDS().show()
您应该有以下输出
+---+---+---+---+---+
| c1| c2| c3| c4| c5|
+---+---+---+---+---+
| 1| A|ABC| pz| A1|
| 2| B|PQR| az| B1|
| 3| C|MNR| cs| c1|
+---+---+---+---+---+
您也可以使用dataframe
,为此您可以使用.toDF()
而不是.toDS()
。
val a = Seq(((1,"A","ABC"),List("pz","A1")),((2, "B", "PQR"),
List("az","B1")),((3,"C", "MNR"), List("cs","c1")))
val a1 = sc.parallelize(a);
val a2 = a1.map(rec=>
(rec._1._1,rec._1._2,rec._1._3,rec._2(0),rec._2(1))).toDF()
a2.show()
+---+---+---+---+---+
| _1| _2| _3| _4| _5|
+---+---+---+---+---+
| 1| A|ABC| pz| A1|
+---+---+---+---+---+
| 2 | B |PQR| az| B1|
+---+---+---+---+---+
| 3 | C |MNR| cs| C1|