我是spark&scala和我正在尝试使用另一列中的值来增加一列中键值对的值。
下面是输入DataFrame。
val inputDF = Seq(
(1, "Visa", 1, None),
(2, "MC", 2, Some("Visa -> 1")),
(3, "Amex", 1, None),
(4, "Amex", 3, Some("Visa -> 1, MC -> 1")),
(5, "Amex", 4, Some("Visa -> 2, MC -> 1")),
(6, "MC", 1, None),
(7, "Visa", 5, Some("Visa -> 2, MC -> 1, Amex -> 1")),
(8, "Visa", 6, Some("Visa -> 2, MC -> 2, Amex -> 1")),
(9, "MC", 1, None),
(10, "MC", 2, Some("Amex -> 1"))).toDF("person_id", "card_type", "number_of_cards", "card_type_details")
+---------+---------+---------------+-----------------------------+
|person_id|card_type|number_of_cards|card_type_details |
+---------+---------+---------------+-----------------------------+
|1 |Visa |1 |null |
|2 |MC |2 |Visa -> 1 |
|3 |Amex |1 |null |
|4 |Amex |3 |Visa -> 1, MC -> 1 |
|5 |Amex |4 |Visa -> 2, MC -> 1 |
|6 |MC |1 |null |
|7 |Visa |5 |Visa -> 2, MC -> 1, Amex -> 1|
|8 |Visa |6 |Visa -> 2, MC -> 2, Amex -> 1|
|9 |MC |1 |null |
|10 |MC |2 |Amex -> 1 |
+---------+---------+---------------+-----------------------------+
现在,根据上面的输入,如果card_type_details的值为null,则从card_type中获取值并添加->1(与第一行相同(。
如果card_type_details的值不为null,则检查card_type是否已作为关键字存在于card_type-details中。如果是,则将相应键的值增加1,否则,添加一个新的键值对(如第二行和第七行(。
以下是预期输出:
val expectedOutputDF = Seq(
(1, "Visa", 1, Some("Visa -> 1")),
(2, "MC", 2, Some("Visa -> 1, MC -> 1")),
(3, "Amex", 1, Some("Amex -> 1")),
(4, "Amex", 3, Some("Visa -> 1, MC -> 1, Amex -> 1")),
(5, "Amex", 4, Some("Visa -> 2, MC -> 1, Amex -> 1")),
(6, "MC", 1, Some("MC -> 1")),
(7, "Visa", 5, Some("Visa -> 3, MC -> 1, Amex -> 1")),
(8, "Visa", 6, Some("Visa -> 3, MC -> 2, Amex -> 1")),
(9, "MC", 1, Some("MC -> 1")),
(10, "MC", 2, Some("Amex -> 1, MC -> 1"))).toDF("person_id", "card_type", "number_of_cards", "card_type_details")
+---------+---------+---------------+-----------------------------+
|person_id|card_type|number_of_cards|card_type_details |
+---------+---------+---------------+-----------------------------+
|1 |Visa |1 |Visa -> 1 |
|2 |MC |2 |Visa -> 1, MC -> 1 |
|3 |Amex |1 |Amex -> 1 |
|4 |Amex |3 |Visa -> 1, MC -> 1, Amex -> 1|
|5 |Amex |4 |Visa -> 2, MC -> 1, Amex -> 1|
|6 |MC |1 |MC -> 1 |
|7 |Visa |5 |Visa -> 3, MC -> 1, Amex -> 1|
|8 |Visa |6 |Visa -> 3, MC -> 2, Amex -> 1|
|9 |MC |1 |MC -> 1 |
|10 |MC |2 |Amex -> 1, MC -> 1 |
+---------+---------+---------------+-----------------------------+
有什么关于如何提取的建议吗?
假设card_type_details
的类型为map
。检查以下代码。
scala> df.show(false)
+---------+---------+---------------+-----------------------------+
|person_id|card_type|number_of_cards|card_type_details |
+---------+---------+---------------+-----------------------------+
|1 |Visa |1 |null |
|2 |MC |2 |Visa -> 1 |
|3 |Amex |1 |null |
|4 |Amex |3 |Visa -> 1, MC -> 1 |
|5 |Amex |4 |Visa -> 2, MC -> 1 |
|6 |MC |1 |null |
|7 |Visa |5 |Visa -> 2, MC -> 1, Amex -> 1|
|8 |Visa |6 |Visa -> 2, MC -> 2, Amex -> 1|
|9 |MC |1 |null |
|10 |MC |2 |Amex -> 1 |
+---------+---------+---------------+-----------------------------+
正在创建表达式。
scala> :paste
// Entering paste mode (ctrl-D to finish)
val colExpr = when(size($"card_type_details") === 0, map($"card_type",lit(1)))
.otherwise(
when(
expr("card_type_details[card_type]").isNotNull,
map_concat(
expr("map(card_type,card_type_details[card_type] + 1)"),
expr("map_filter(card_type_details,(k,v) -> k != card_type)")
)
)
.otherwise(map_concat($"card_type_details",map($"card_type",lit(1))))
)
// Exiting paste mode, now interpreting.
colExpr: org.apache.spark.sql.Column = CASE WHEN (size(card_type_details) = 0) THEN map(card_type, 1) ELSE CASE WHEN (card_type_details[card_type] IS NOT NULL) THEN map_concat(map(card_type, (card_type_details[card_type] + 1)), map_filter(card_type_details, lambdafunction((NOT (k = card_type)), k, v))) ELSE map_concat(card_type_details, map(card_type, 1)) END END
scala> indf.withColumn("new_card_type_details",colExpr).show(false)
+---------+---------+---------------+-------------------------------+-------------------------------+
|person_id|card_type|number_of_cards|card_type_details |new_card_type_details |
+---------+---------+---------------+-------------------------------+-------------------------------+
|1 |Visa |1 |[] |[Visa -> 1] |
|2 |MC |2 |[Visa -> 1] |[Visa -> 1, MC -> 1] |
|3 |Amex |1 |[] |[Amex -> 1] |
|4 |Amex |3 |[Visa -> 1, MC -> 1] |[Visa -> 1, MC -> 1, Amex -> 1]|
|5 |Amex |4 |[Visa -> 2, MC -> 1] |[Visa -> 2, MC -> 1, Amex -> 1]|
|6 |MC |1 |[] |[MC -> 1] |
|7 |Visa |5 |[Visa -> 2, MC -> 1, Amex -> 1]|[Visa -> 3, MC -> 1, Amex -> 1]|
|8 |Visa |6 |[Visa -> 2, MC -> 2, Amex -> 1]|[Visa -> 3, MC -> 2, Amex -> 1]|
|9 |MC |1 |[] |[MC -> 1] |
|10 |MC |2 |[Amex -> 1] |[Amex -> 1, MC -> 1] |
+---------+---------+---------------+-------------------------------+-------------------------------+