如何在 Spark sql 中将映射中的键/值提取到单独的列中



我有带有地图的表格。我想从该地图中制作 2 个单独的列 - 1。键列 2.值列。

input.show();
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
|addedSkuWithTimestamp|     fbaSKUAdditions|fbaSKURemovals|      merchantId|mfnSKUAdditions|mfnSKURemovals|removedSkuWithTimestamp|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
| [Test1 -> 1234567...|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|ANOTHER_MERCHANT|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|ANOTHER_MERCHANT|             []|          null|                   null|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+

但我希望输出为

test1  123456789 
Test2  123456780 

如何从地图中获取 2 个不同的列(键列和值列)?

Dataset<Row> removed_skus = input
.withColumn("sku", functions.explode(input.col("removedSkuWithTimestamp")))
.withColumn("skuType", functions.lit("MFN"))
.select(input.col("merchantId").alias("merchant_id"), new Column("sku").,
new Column("skuType"))
.distinct()
.groupBy("merchant_id")
.agg(functions.collect_list("sku").alias("removedSkus"));

使用来自其他答案的相同输入

val df = Seq(
(Map("timestamp1"->1585008000, "timestamp3"-> 1584921600), "AFN"),
(Map("timestamp2"-> 1584835200), "AFN"),
(null, "AFN") 
).toDF("addedSkuWithTimestamp", "skuType")

尝试使用爆炸,我在 Spark 2.2.1 和 2.3.1 中对此进行了测试

df.select(explode($"addedSkuWithTimestamp")).show(false)
+----------+----------+
|key       |value     |
+----------+----------+
|timestamp1|1585008000|
|timestamp3|1584921600|
|timestamp2|1584835200|
+----------+----------+

首先,让我们创建一些数据:

val df = Seq(
(Map("sku1"->"timestamp1"), "AFN"),
(Map("sku2"->"timestamp2"), "AFN"),
(null, "AFN") 
).toDF("addedSkuWithTimestamp", "skuType")
.show(false)
+---------------------+-------+
|addedSkuWithTimestamp|skuType|
+---------------------+-------+
| [sku1 -> timestamp1]|    AFN|
| [sku2 -> timestamp2]|    AFN|
|                 null|    AFN|
+---------------------+-------+

这将具有以下架构:

scala> df.printSchema()
root
|-- addedSkuWithTimestamp: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)
|-- skuType: string (nullable = true)

火花<2.3

下一个代码将使用mapToTupleUDFudf 函数从addSkuWithTimestamp列中提取sku_key和sku_value列:

val mapToTupleUDF = udf((sku: Map[String, String]) => if(sku != null) sku.toSeq(0) else null)
df.withColumn("addedSkuWithTimestamp", mapToTupleUDF($"addedSkuWithTimestamp"))
.withColumn("Sku", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._1"))
.withColumn("Timestamp", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._2"))
.show(false)
+---------------------+-------+----+----------+
|addedSkuWithTimestamp|skuType|Sku |Timestamp |
+---------------------+-------+----+----------+
|[sku1, timestamp1]   |AFN    |sku1|timestamp1|
|[sku2, timestamp2]   |AFN    |sku2|timestamp2|
|null                 |AFN    |null|null      |
+---------------------+-------+----+----------+

请注意,只有当addedSkuWithTimestamp为 null时,我们才能访问addedSkuWithTimestamp._1

火花>= 2.3

从Spark 2.3.0开始,您可以使用内置map_valuesmap_keys

df.withColumn("Sku", map_keys($"addedSkuWithTimestamp").getItem(0))
.withColumn("Timestamp", map_values($"addedSkuWithTimestamp").getItem(0))
.show(false)

输出:

+---------------------+-------+----+----------+
|addedSkuWithTimestamp|skuType|Sku |Timestamp |
+---------------------+-------+----+----------+
|[sku1 -> timestamp1] |AFN    |sku1|timestamp1|
|[sku2 -> timestamp2] |AFN    |sku2|timestamp2|
|null                 |AFN    |null|null      |
+---------------------+-------+----+----------+

相关内容

最新更新