我写了以下代码
val list = List(
Map("empid" -> "12", "empName" -> "Rohan", "depId" -> "201"),
Map("empid" -> "13", "empName" -> "swathi", "depId" -> "202")
).flatten.toMap
val mapRDD= sc.parallelize(Seq(list))
val columns=mapRDD.take(1).flatMap(a=>a.keys)
val columnval=mapRDD.take(2).flatMap(a=>a.keys)
val resultantDF=mapRDD.map{value=>
val list=value.values.toList
(list(0),list(1),list(2))
}.toDF(columns:_*)
resultantDF.show()
我期望以下输出,
----- ------- ----- -----
| empid | empname | depid |
----- ------- ----- -----
|12 |罗汉|201 |
|13 | Swathi | 202 |
但我只得到
----- ------- ----- -----
| empid | empname | depid |
----- -------- ------ |13 | Swathi | 202
请让我知道我在哪里犯错。
问题仅在于您的第一行,
scala> val list = List(
| Map("empid" -> "12", "empName" -> "Rohan", "depId" -> "201"),
| Map("empid" -> "13", "empName" -> "swathi", "depId" -> "202")
| ).flatten.toMap
// list: scala.collection.immutable.Map[String,String] = Map(empid -> 13, empName -> swathi, depId -> 202)
您的list
实际上最终成为Map
。Map
只能为每个密钥具有1个值。
让我们逐步进行第一行,
所以,首先您创建了list of maps
,
scala> val listOfMaps = List(
| Map("empid" -> "12", "empName" -> "Rohan", "depId" -> "201"),
| Map("empid" -> "13", "empName" -> "swathi", "depId" -> "202")
| )
// list: List[scala.collection.immutable.Map[String,String]] = List(Map(empid -> 12, empName -> Rohan, depId -> 201), Map(empid -> 13, empName -> swathi, depId -> 202))
然后,您在listOfMaps
内的flattened
maps
,这将导致key-value
Pairs的列表。
scala> val flattenedListOfMaps = listOfMaps.flatten
// flattenedListOfMaps: List[(String, String)] = List((empid,12), (empName,Rohan), (depId,201), (empid,13), (empName,swathi), (depId,202))
现在,您使用toMap
将其转换为Map
,它将继续覆盖键的值,并带有独特键的Map
,
scala> scala> val yourMap = flattenedListOfMaps.toMap
// yourMap: scala.collection.immutable.Map[String,String] = Map(empid -> 13, empName -> swathi, depId -> 202)
正如上一个答案和评论中已经指出的那样,目前您的列表变量实际上是一张映射(至少令人困惑)。您最初可能想要的输入是一个列表。因此,您需要的是:
1。摆脱.flatten.toMap
:
val list = List(
Map("empid" -> "12", "empName" -> "Rohan", "depId" -> "201"),
Map("empid" -> "13", "empName" -> "swathi", "depId" -> "202")
)
2。同样,调用sc.parallelize
时,您无需与原始输入创建单独的SEQ(实际上,否则您会有编译错误)。因此,您还需要这样更改:
val mapRDD = sc.parallelize(list)
仅进行这两个更改后,您将收到预期结果,即控制台输出中显示的2个记录。