Pyspark - 对于每个 ID,我希望过滤在两个 id 匹配的数据之后发生的数据



免责声明:我对pyspark很陌生。

下面是一个示例:

原始数据集

+----------+--------------------+---+----+
|install_id|influencer_date_time|id1|id2 | 
+----------+--------------------+---+----+
|  68483732| 2020-05-28 22:56:43|21 | 543|
|  68483732| 2020-05-28 23:21:53|35 | 231|
|  68483732| 2020-05-29 00:03:21|23 | 23 |
|  68483732| 2020-05-29 00:05:21|54 | 654|
|  68486103| 2020-06-01 00:37:38|23 | 234|
|  68486103| 2020-06-01 00:59:30|12 | 14 |
|  68486103| 2020-06-01 01:59:30|54 | 54 |
+----------+--------------------+---+----+

我的所有数据都按install_idinfluencer_date_time升序排序。

过滤条件:

  • 对于每个install_id查找id1id2相同的位置
  • 对于每个install_id删除上面找到的行之后的任何行。

在上面的示例中,对于install_id68483732,第三行中的id1id2相同。因此,我们需要删除第 4 行。

对于install_id68486103,它将保持原样,因为在具有匹配id1id2的行之后没有行。

以下是最终数据集的外观:

+----------+--------------------+---+----+
|install_id|influencer_date_time|id1|id2 | 
+----------+--------------------+---+----+
|  68483732| 2020-05-28 22:56:43|21 | 543|
|  68483732| 2020-05-28 23:21:53|35 | 231|
|  68483732| 2020-05-29 00:03:21|23 | 23 |
|  68486103| 2020-06-01 00:37:38|23 | 234|
|  68486103| 2020-06-01 00:59:30|12 | 12 |
|  68486103| 2020-06-01 01:59:30|54 | 54 |
+----------+--------------------+---+----+

任何帮助将不胜感激。

另一种选择-

用斯卡拉语编写

加载测试数据

val data =
"""
|install_id|influencer_date_time|id1|id2
|  68483732| 2020-05-28 22:56:43|21 | 543
|  68483732| 2020-05-28 23:21:53|35 | 231
|  68483732| 2020-05-29 00:03:21|23 | 23
|  68483732| 2020-05-29 00:05:21|54 | 654
|  68483732| 2020-05-29 00:06:21|12 | 12
|  68483732| 2020-05-29 00:07:21|54 | 654
|  68486103| 2020-06-01 00:37:38|23 | 234
|  68486103| 2020-06-01 00:59:30|12 | 14
|  68486103| 2020-06-01 01:59:30|54 | 54
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\|").map(_.replaceAll("""^[ t]+|[ t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +----------+--------------------+---+---+
* |install_id|influencer_date_time|id1|id2|
* +----------+--------------------+---+---+
* |68483732  |2020-05-28 22:56:43 |21 |543|
* |68483732  |2020-05-28 23:21:53 |35 |231|
* |68483732  |2020-05-29 00:03:21 |23 |23 |
* |68483732  |2020-05-29 00:05:21 |54 |654|
* |68483732  |2020-05-29 00:06:21 |12 |12 |
* |68483732  |2020-05-29 00:07:21 |54 |654|
* |68486103  |2020-06-01 00:37:38 |23 |234|
* |68486103  |2020-06-01 00:59:30 |12 |14 |
* |68486103  |2020-06-01 01:59:30 |54 |54 |
* +----------+--------------------+---+---+
*
* root
* |-- install_id: integer (nullable = true)
* |-- influencer_date_time: timestamp (nullable = true)
* |-- id1: integer (nullable = true)
* |-- id2: integer (nullable = true)
*/

在组中首次匹配 ID1 和 ID2 后删除行

val w = Window.partitionBy("install_id").orderBy("influencer_date_time")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn("new_col", min(when($"id1" === $"id2", $"influencer_date_time")).over(w))
.filter($"influencer_date_time".cast("long") - $"new_col".cast("long")<=0)
.show(false)
/**
* +----------+--------------------+---+---+-------------------+
* |install_id|influencer_date_time|id1|id2|new_col            |
* +----------+--------------------+---+---+-------------------+
* |68483732  |2020-05-28 22:56:43 |21 |543|2020-05-29 00:03:21|
* |68483732  |2020-05-28 23:21:53 |35 |231|2020-05-29 00:03:21|
* |68483732  |2020-05-29 00:03:21 |23 |23 |2020-05-29 00:03:21|
* |68486103  |2020-06-01 00:37:38 |23 |234|2020-06-01 01:59:30|
* |68486103  |2020-06-01 00:59:30 |12 |14 |2020-06-01 01:59:30|
* |68486103  |2020-06-01 01:59:30 |54 |54 |2020-06-01 01:59:30|
* +----------+--------------------+---+---+-------------------+
*/

删除组中最后一个匹配的 ID1 和 ID2 之后的行

// drop rows after last matching id1 and id2 in a group
df.withColumn("new_col", max(when($"id1" === $"id2", $"influencer_date_time")).over(w))
.filter($"influencer_date_time".cast("long") - $"new_col".cast("long")<=0)
.show(false)
/**
* +----------+--------------------+---+---+-------------------+
* |install_id|influencer_date_time|id1|id2|new_col            |
* +----------+--------------------+---+---+-------------------+
* |68483732  |2020-05-28 22:56:43 |21 |543|2020-05-29 00:06:21|
* |68483732  |2020-05-28 23:21:53 |35 |231|2020-05-29 00:06:21|
* |68483732  |2020-05-29 00:03:21 |23 |23 |2020-05-29 00:06:21|
* |68483732  |2020-05-29 00:05:21 |54 |654|2020-05-29 00:06:21|
* |68483732  |2020-05-29 00:06:21 |12 |12 |2020-05-29 00:06:21|
* |68486103  |2020-06-01 00:37:38 |23 |234|2020-06-01 01:59:30|
* |68486103  |2020-06-01 00:59:30 |12 |14 |2020-06-01 01:59:30|
* |68486103  |2020-06-01 01:59:30 |54 |54 |2020-06-01 01:59:30|
* +----------+--------------------+---+---+-------------------+
*/

比较后尝试使用窗口的滞后函数

import pyspark.sql.functions as F
from pyspark.sql.window import Window
#Test data
tst = sqlContext.createDataFrame([(1,2,3,4),(1,3,4,1),(1,4,5,5),(1,6,7,8),(2,1,9,2),(2,2,9,9)],schema=['col1','col2','id1','id2'])
w = Window.partitionBy('col1').orderBy('col2')
tst_cmp = tst.withColumn("cmp",F.when(F.col('id1')==F.col('id2'),1).otherwise(0))
#%%
tst_flag = tst_cmp.withColumn("flag", F.sum(F.lag('cmp',default=0).over(w)).over(w))
tst_res = tst_flag.where(F.col('flag')==0)

最新更新