如何在 Scala 中基于 ID 和状态 = "Removed" 动态删除行



这里有一些示例数据集和如何根据(列(状态="已删除"动态删除ID(无硬编码值(。

示例数据集:

ID  Status       Date      Amount
1   New         01/05/20    20
1   Assigned    02/05/20    30
1   In-Progress 02/05/20    50
2   New         02/05/20    30
2   Removed     03/05/20    20
3   In-Progress 09/05/20    50
3   Removed     09/05/20    20
4   New         10/05/20    20
4   Assigned    10/05/20    30

预期结果:-

ID  Status       Date      Amount
1   New         01/05/20    20
1   Assigned    02/05/20    30
1   In-Progress 02/05/20    50
4   New         10/05/20    20
4   Assigned    10/05/20    30

提前谢谢。

您可以使用filternot like/rlikefilter out数据帧中具有status=removed的记录。

import org.apche.spark.sql.functions._
//assuming df is the dataframe
//using filter or where clause, trim to remove white spaces lower to convert to lower
val df1=df.filter(lower(trim(col("status"))) !== "removed")
//or by filtering status Removed filter won't match if you have mixed case
val df1=df.filter(col("status") !== "Removed")
//using not like
val df1=df.filter(!lower(col("status")).like("removed"))
//using not rlike
val df1=df.filter(!col("status").rlike(".*(?i)removed.*"))

现在,df1数据帧中将包含所需的记录。


UPDATE:

From Spark2.4:

在这种情况下,我们可以使用joinwindow子句。

val df=Seq((1,"New","01/05/20","20"),(1,"Assigned","02/05/20","30"),(1,"In-Progress","02/05/20","50"),(2,"New","02/05/20","30"),(2,"Removed","03/05/20","20"),(3,"In-Progress","09/05/20","50"),(3,"Removed","09/05/20","20"),(4,"New","10/05/20","20"),(4,"Assigned","10/05/20","30")).toDF("ID","Status","Date","Amount")
import org.apache.spark.sql.expressions._
val df1=df.
groupBy("id").
agg(collect_list(lower(col("Status"))).alias("status_arr"))
//using array_contains function
df.alias("t1").join(df1.alias("t2"),Seq("id"),"inner").
filter(!array_contains(col("status_arr"),"removed")).
drop("status_arr").show()
//without join using window clause
val w=Window.partitionBy("id").orderBy("Status").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn("status_arr",collect_list(lower(col("status"))).over(w)).
filter(!array_contains(col("status_arr"),"removed")).
drop("status_arr").
show()
//+---+-----------+--------+------+
//| ID|     Status|    Date|Amount|
//+---+-----------+--------+------+
//|  1|        New|01/05/20|    20|
//|  1|   Assigned|02/05/20|    30|
//|  1|In-Progress|02/05/20|    50|
//|  4|        New|10/05/20|    20|
//|  4|   Assigned|10/05/20|    30|
//+---+-----------+--------+------+

For Spark < 2.4:

val df1=df.groupBy("id").agg(concat_ws("",collect_list(lower(col("Status")))).alias("status_arr"))
df.alias("t1").join(df1.alias("t2"),Seq("id"),"inner").
filter(!col("status_arr").contains("removed")).
drop(col("status_arr")).
show()
//Using window functions
df.withColumn("status_arr",concat_ws("",collect_list(lower(col("status"))).over(w))).
filter(!col("status_arr").contains("removed")).
drop(col("status_arr")).
show(false)
//+---+-----------+--------+------+
//| ID|     Status|    Date|Amount|
//+---+-----------+--------+------+
//|  1|        New|01/05/20|    20|
//|  1|   Assigned|02/05/20|    30|
//|  1|In-Progress|02/05/20|    50|
//|  4|        New|10/05/20|    20|
//|  4|   Assigned|10/05/20|    30|
//+---+-----------+--------+------+

假设res0是你的数据集,你可以做:

import spark.implicits._
val x = res0.where($"Status" !== "Removed")
x.show()

这将删除状态为已删除的行,但不会根据您在上面发布的内容给出您想要实现的目标。

相关内容

  • 没有找到相关文章

最新更新