我在Hive表中有一列:
列名称:筛选器
数据类型:
|-- filters: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- value: string (nullable = true)
我想通过其相应的名称从此列中获取值。
到目前为止我做了什么:
val sdf: DataFrame = sqlContext.sql("select * from <tablename> where id='12345'")
val sdfFilters = sdf.select("filters").rdd.map(r => r(0).asInstanceOf[Seq[(String,String)]]).collect()
Output: sdfFilters: Array[Seq[(String, String)]] = Array(WrappedArray([filter_RISKFACTOR,OIS.SPD.*], [filter_AGGCODE,IR]), WrappedArray([filter_AGGCODE,IR_]))
注意:强制转换为 Seq,因为无法将包装数组转换为映射。
下一步该怎么做?
I want to get the value from this column by it's corresponding name.
如果您想要简单可靠的方法来按名称获取所有值,您可以使用分解和过滤器展平表:
case class Data(name: String, value: String)
case class Filters(filters: Array[Data])
val df = sqlContext.createDataFrame(Seq(Filters(Array(Data("a", "b"), Data("a", "c"))), Filters(Array(Data("b", "c")))))
df.show()
+--------------+
| filters|
+--------------+
|[[a,b], [a,c]]|
| [[b,c]]|
+--------------+
df.withColumn("filter", explode($"filters"))
.select($"filter.name" as "name", $"filter.value" as "value")
.where($"name" === "a")
.show()
+----+-----+
|name|value|
+----+-----+
| a| b|
| a| c|
+----+-----+
您还可以以任何您想要的方式收集数据:
val flatDf = df.withColumn("filter", explode($"filters")).select($"filter.name" as "name", $"filter.value" as "value")
flatDf.rdd.map(r => Array(r(0), r(1))).collect()
res0: Array[Array[Any]] = Array(Array(a, b), Array(a, c), Array(b, c))
flatDf.rdd.map(r => r(0) -> r(1)).groupByKey().collect() //not the best idea, if you have many values per key
res1: Array[(Any, Iterable[Any])] = Array((b,CompactBuffer(c)), (a,CompactBuffer(b, c)))
如果您想将array[struct]
转换为map[string, string]
以便将来保存到某些存储中 - 这是另一回事,这种情况最好由 UDF 解决。无论如何,您必须尽可能避免collect()
以保持代码的可伸缩性。