通过具有新指示器列来触发数据帧组



我需要按"KEY"列分组,需要检查"TYPE_CODE"列是否同时具有"PL"和"JL"值,如果是,那么我需要添加一个指标列作为"Y",否则"N">

例:

//Input Values
val values = List(List("66","PL") ,
List("67","JL") , List("67","PL"),List("67","PO"),
List("68","JL"),List("68","PO")).map(x =>(x(0), x(1)))
import spark.implicits._
//created a dataframe
val cmc = values.toDF("KEY","TYPE_CODE")
cmc.show(false)
------------------------
KEY |TYPE_CODE  |
------------------------
66  |PL |
67  |JL |
67  |PL |
67  |PO |
68  |JL |
68  |PO |
-------------------------

预期输出 :

对于每个"KEY",如果它有"TYPE_CODE"同时具有PL和JL,则Y 否则 N

-----------------------------------------------------
KEY |TYPE_CODE  | Indicator
-----------------------------------------------------
66  |PL         | N
67  |JL         | Y
67  |PL         | Y
67  |PO         | Y
68  |JL         | N
68  |PO         | N
---------------------------------------------------

例如 67 同时具有 PL 和 JL - 所以"Y" 66 只有 PL - 所以"N" 68 只有 JL - 所以"N">

一个选项:

1)收集TYPE_CODE作为清单;

2)检查它是否包含特定的字符串;

3)然后用explode扁平列表:

(cmc.groupBy("KEY")
.agg(collect_list("TYPE_CODE").as("TYPE_CODE"))
.withColumn("Indicator", 
when(array_contains($"TYPE_CODE", "PL") && array_contains($"TYPE_CODE", "JL"), "Y").otherwise("N"))
.withColumn("TYPE_CODE", explode($"TYPE_CODE"))).show
+---+---------+---------+
|KEY|TYPE_CODE|Indicator|
+---+---------+---------+
| 68|       JL|        N|
| 68|       PO|        N|    
| 67|       JL|        Y|
| 67|       PL|        Y|
| 67|       PO|        Y|
| 66|       PL|        N|
+---+---------+---------+

另一种选择:

  1. KEY分组并使用agg创建两个单独的指标列(一个用于JL,一个用于PL),然后计算组合指标

  2. 与原始数据帧join

完全:

val indicators = cmc.groupBy("KEY").agg(
sum(when($"TYPE_CODE" === "PL", 1).otherwise(0)) as "pls",
sum(when($"TYPE_CODE" === "JL", 1).otherwise(0)) as "jls"
).withColumn("Indicator", when($"pls" > 0 && $"jls" > 0, "Y").otherwise("N"))
val result = cmc.join(indicators, "KEY")
.select("KEY", "TYPE_CODE", "Indicator")

这可能比@Psidom的答案慢,但可能更安全-collect_list如果特定键有大量匹配项(该列表必须存储在单个工作人员的内存中),则可能会出现问题。

编辑

如果已知输入是唯一(即JL/PL每个键最多只出现一次),则可以使用简单的count聚合来创建indicators,这(可以说)更容易阅读:

val indicators = cmc
.where($"TYPE_CODE".isin("PL", "JL"))
.groupBy("KEY").count()
.withColumn("Indicator", when($"count" === 2, "Y").otherwise("N"))

相关内容

  • 没有找到相关文章

最新更新