spark-scala数据帧中键值对的增量值计数



我是spark&scala和我正在尝试使用另一列中的值来增加一列中键值对的值。

下面是输入DataFrame。

val inputDF = Seq(
(1, "Visa", 1, None), 
(2, "MC", 2, Some("Visa -> 1")), 
(3, "Amex", 1, None), 
(4, "Amex", 3, Some("Visa -> 1, MC -> 1")), 
(5, "Amex", 4, Some("Visa -> 2, MC -> 1")),
(6, "MC", 1, None), 
(7, "Visa", 5, Some("Visa -> 2, MC -> 1, Amex -> 1")), 
(8, "Visa", 6, Some("Visa -> 2, MC -> 2, Amex -> 1")), 
(9, "MC", 1, None),
(10, "MC", 2, Some("Amex -> 1"))).toDF("person_id", "card_type", "number_of_cards", "card_type_details")
+---------+---------+---------------+-----------------------------+
|person_id|card_type|number_of_cards|card_type_details            |
+---------+---------+---------------+-----------------------------+
|1        |Visa     |1              |null                         |
|2        |MC       |2              |Visa -> 1                    |
|3        |Amex     |1              |null                         |
|4        |Amex     |3              |Visa -> 1, MC -> 1           |
|5        |Amex     |4              |Visa -> 2, MC -> 1           |
|6        |MC       |1              |null                         |
|7        |Visa     |5              |Visa -> 2, MC -> 1, Amex -> 1|
|8        |Visa     |6              |Visa -> 2, MC -> 2, Amex -> 1|
|9        |MC       |1              |null                         |
|10       |MC       |2              |Amex -> 1                    |
+---------+---------+---------------+-----------------------------+

现在,根据上面的输入,如果card_type_details的值为null,则从card_type中获取值并添加->1(与第一行相同(。

如果card_type_details的值不为null,则检查card_type是否已作为关键字存在于card_type-details中。如果是,则将相应键的值增加1,否则,添加一个新的键值对(如第二行和第七行(。

以下是预期输出:

val expectedOutputDF = Seq(
(1, "Visa", 1, Some("Visa -> 1")), 
(2, "MC", 2, Some("Visa -> 1, MC -> 1")), 
(3, "Amex", 1, Some("Amex -> 1")), 
(4, "Amex", 3, Some("Visa -> 1, MC -> 1, Amex -> 1")), 
(5, "Amex", 4, Some("Visa -> 2, MC -> 1, Amex -> 1")),
(6, "MC", 1, Some("MC -> 1")), 
(7, "Visa", 5, Some("Visa -> 3, MC -> 1, Amex -> 1")), 
(8, "Visa", 6, Some("Visa -> 3, MC -> 2, Amex -> 1")), 
(9, "MC", 1, Some("MC -> 1")),
(10, "MC", 2, Some("Amex -> 1, MC -> 1"))).toDF("person_id", "card_type", "number_of_cards", "card_type_details")
+---------+---------+---------------+-----------------------------+
|person_id|card_type|number_of_cards|card_type_details            |
+---------+---------+---------------+-----------------------------+
|1        |Visa     |1              |Visa -> 1                    |
|2        |MC       |2              |Visa -> 1, MC -> 1           |
|3        |Amex     |1              |Amex -> 1                    |
|4        |Amex     |3              |Visa -> 1, MC -> 1, Amex -> 1|
|5        |Amex     |4              |Visa -> 2, MC -> 1, Amex -> 1|
|6        |MC       |1              |MC -> 1                      |
|7        |Visa     |5              |Visa -> 3, MC -> 1, Amex -> 1|
|8        |Visa     |6              |Visa -> 3, MC -> 2, Amex -> 1|
|9        |MC       |1              |MC -> 1                      |
|10       |MC       |2              |Amex -> 1, MC -> 1           |
+---------+---------+---------------+-----------------------------+

有什么关于如何提取的建议吗?

假设card_type_details的类型为map。检查以下代码。

scala> df.show(false)
+---------+---------+---------------+-----------------------------+
|person_id|card_type|number_of_cards|card_type_details            |
+---------+---------+---------------+-----------------------------+
|1        |Visa     |1              |null                         |
|2        |MC       |2              |Visa -> 1                    |
|3        |Amex     |1              |null                         |
|4        |Amex     |3              |Visa -> 1, MC -> 1           |
|5        |Amex     |4              |Visa -> 2, MC -> 1           |
|6        |MC       |1              |null                         |
|7        |Visa     |5              |Visa -> 2, MC -> 1, Amex -> 1|
|8        |Visa     |6              |Visa -> 2, MC -> 2, Amex -> 1|
|9        |MC       |1              |null                         |
|10       |MC       |2              |Amex -> 1                    |
+---------+---------+---------------+-----------------------------+

正在创建表达式。

scala> :paste
// Entering paste mode (ctrl-D to finish)
val colExpr = when(size($"card_type_details") === 0, map($"card_type",lit(1)))
.otherwise(
when(
expr("card_type_details[card_type]").isNotNull,
map_concat(
expr("map(card_type,card_type_details[card_type] + 1)"),
expr("map_filter(card_type_details,(k,v) -> k != card_type)")
)
)
.otherwise(map_concat($"card_type_details",map($"card_type",lit(1))))
)
// Exiting paste mode, now interpreting.
colExpr: org.apache.spark.sql.Column = CASE WHEN (size(card_type_details) = 0) THEN map(card_type, 1) ELSE CASE WHEN (card_type_details[card_type] IS NOT NULL) THEN map_concat(map(card_type, (card_type_details[card_type] + 1)), map_filter(card_type_details, lambdafunction((NOT (k = card_type)), k, v))) ELSE map_concat(card_type_details, map(card_type, 1)) END END
scala> indf.withColumn("new_card_type_details",colExpr).show(false)
+---------+---------+---------------+-------------------------------+-------------------------------+
|person_id|card_type|number_of_cards|card_type_details              |new_card_type_details          |
+---------+---------+---------------+-------------------------------+-------------------------------+
|1        |Visa     |1              |[]                             |[Visa -> 1]                    |
|2        |MC       |2              |[Visa -> 1]                    |[Visa -> 1, MC -> 1]           |
|3        |Amex     |1              |[]                             |[Amex -> 1]                    |
|4        |Amex     |3              |[Visa -> 1, MC -> 1]           |[Visa -> 1, MC -> 1, Amex -> 1]|
|5        |Amex     |4              |[Visa -> 2, MC -> 1]           |[Visa -> 2, MC -> 1, Amex -> 1]|
|6        |MC       |1              |[]                             |[MC -> 1]                      |
|7        |Visa     |5              |[Visa -> 2, MC -> 1, Amex -> 1]|[Visa -> 3, MC -> 1, Amex -> 1]|
|8        |Visa     |6              |[Visa -> 2, MC -> 2, Amex -> 1]|[Visa -> 3, MC -> 2, Amex -> 1]|
|9        |MC       |1              |[]                             |[MC -> 1]                      |
|10       |MC       |2              |[Amex -> 1]                    |[Amex -> 1, MC -> 1]           |
+---------+---------+---------------+-------------------------------+-------------------------------+

最新更新