我有带有地图的表格。我想从该地图中制作 2 个单独的列 - 1。键列 2.值列。
input.show();
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
|addedSkuWithTimestamp| fbaSKUAdditions|fbaSKURemovals| merchantId|mfnSKUAdditions|mfnSKURemovals|removedSkuWithTimestamp|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
| [Test1 -> 1234567...|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []|ANOTHER_MERCHANT| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []|ANOTHER_MERCHANT| []| null| null|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
但我希望输出为
test1 123456789
Test2 123456780
如何从地图中获取 2 个不同的列(键列和值列)?
Dataset<Row> removed_skus = input
.withColumn("sku", functions.explode(input.col("removedSkuWithTimestamp")))
.withColumn("skuType", functions.lit("MFN"))
.select(input.col("merchantId").alias("merchant_id"), new Column("sku").,
new Column("skuType"))
.distinct()
.groupBy("merchant_id")
.agg(functions.collect_list("sku").alias("removedSkus"));
使用来自其他答案的相同输入
val df = Seq(
(Map("timestamp1"->1585008000, "timestamp3"-> 1584921600), "AFN"),
(Map("timestamp2"-> 1584835200), "AFN"),
(null, "AFN")
).toDF("addedSkuWithTimestamp", "skuType")
尝试使用爆炸,我在 Spark 2.2.1 和 2.3.1 中对此进行了测试
df.select(explode($"addedSkuWithTimestamp")).show(false)
+----------+----------+
|key |value |
+----------+----------+
|timestamp1|1585008000|
|timestamp3|1584921600|
|timestamp2|1584835200|
+----------+----------+
首先,让我们创建一些数据:
val df = Seq(
(Map("sku1"->"timestamp1"), "AFN"),
(Map("sku2"->"timestamp2"), "AFN"),
(null, "AFN")
).toDF("addedSkuWithTimestamp", "skuType")
.show(false)
+---------------------+-------+
|addedSkuWithTimestamp|skuType|
+---------------------+-------+
| [sku1 -> timestamp1]| AFN|
| [sku2 -> timestamp2]| AFN|
| null| AFN|
+---------------------+-------+
这将具有以下架构:
scala> df.printSchema()
root
|-- addedSkuWithTimestamp: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- skuType: string (nullable = true)
火花<2.3
下一个代码将使用mapToTupleUDF
udf 函数从addSkuWithTimestamp列中提取sku_key和sku_value列:
val mapToTupleUDF = udf((sku: Map[String, String]) => if(sku != null) sku.toSeq(0) else null)
df.withColumn("addedSkuWithTimestamp", mapToTupleUDF($"addedSkuWithTimestamp"))
.withColumn("Sku", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._1"))
.withColumn("Timestamp", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._2"))
.show(false)
+---------------------+-------+----+----------+
|addedSkuWithTimestamp|skuType|Sku |Timestamp |
+---------------------+-------+----+----------+
|[sku1, timestamp1] |AFN |sku1|timestamp1|
|[sku2, timestamp2] |AFN |sku2|timestamp2|
|null |AFN |null|null |
+---------------------+-------+----+----------+
请注意,只有当addedSkuWithTimestamp
不为 null时,我们才能访问addedSkuWithTimestamp._1
。
火花>= 2.3
从Spark 2.3.0开始,您可以使用内置map_values
并map_keys
:
df.withColumn("Sku", map_keys($"addedSkuWithTimestamp").getItem(0))
.withColumn("Timestamp", map_values($"addedSkuWithTimestamp").getItem(0))
.show(false)
输出:
+---------------------+-------+----+----------+
|addedSkuWithTimestamp|skuType|Sku |Timestamp |
+---------------------+-------+----+----------+
|[sku1 -> timestamp1] |AFN |sku1|timestamp1|
|[sku2 -> timestamp2] |AFN |sku2|timestamp2|
|null |AFN |null|null |
+---------------------+-------+----+----------+