免责声明:我对pyspark很陌生。
下面是一个示例:
原始数据集
+----------+--------------------+---+----+
|install_id|influencer_date_time|id1|id2 |
+----------+--------------------+---+----+
| 68483732| 2020-05-28 22:56:43|21 | 543|
| 68483732| 2020-05-28 23:21:53|35 | 231|
| 68483732| 2020-05-29 00:03:21|23 | 23 |
| 68483732| 2020-05-29 00:05:21|54 | 654|
| 68486103| 2020-06-01 00:37:38|23 | 234|
| 68486103| 2020-06-01 00:59:30|12 | 14 |
| 68486103| 2020-06-01 01:59:30|54 | 54 |
+----------+--------------------+---+----+
我的所有数据都按install_id
和influencer_date_time
升序排序。
过滤条件:
- 对于每个
install_id
查找id1
和id2
相同的位置 - 对于每个
install_id
,删除上面找到的行之后的任何行。
在上面的示例中,对于install_id
68483732,第三行中的id1
和id2
相同。因此,我们需要删除第 4 行。
对于install_id
68486103,它将保持原样,因为在具有匹配id1
和id2
的行之后没有行。
以下是最终数据集的外观:
+----------+--------------------+---+----+
|install_id|influencer_date_time|id1|id2 |
+----------+--------------------+---+----+
| 68483732| 2020-05-28 22:56:43|21 | 543|
| 68483732| 2020-05-28 23:21:53|35 | 231|
| 68483732| 2020-05-29 00:03:21|23 | 23 |
| 68486103| 2020-06-01 00:37:38|23 | 234|
| 68486103| 2020-06-01 00:59:30|12 | 12 |
| 68486103| 2020-06-01 01:59:30|54 | 54 |
+----------+--------------------+---+----+
任何帮助将不胜感激。
另一种选择-
用斯卡拉语编写
加载测试数据
val data =
"""
|install_id|influencer_date_time|id1|id2
| 68483732| 2020-05-28 22:56:43|21 | 543
| 68483732| 2020-05-28 23:21:53|35 | 231
| 68483732| 2020-05-29 00:03:21|23 | 23
| 68483732| 2020-05-29 00:05:21|54 | 654
| 68483732| 2020-05-29 00:06:21|12 | 12
| 68483732| 2020-05-29 00:07:21|54 | 654
| 68486103| 2020-06-01 00:37:38|23 | 234
| 68486103| 2020-06-01 00:59:30|12 | 14
| 68486103| 2020-06-01 01:59:30|54 | 54
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\|").map(_.replaceAll("""^[ t]+|[ t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +----------+--------------------+---+---+
* |install_id|influencer_date_time|id1|id2|
* +----------+--------------------+---+---+
* |68483732 |2020-05-28 22:56:43 |21 |543|
* |68483732 |2020-05-28 23:21:53 |35 |231|
* |68483732 |2020-05-29 00:03:21 |23 |23 |
* |68483732 |2020-05-29 00:05:21 |54 |654|
* |68483732 |2020-05-29 00:06:21 |12 |12 |
* |68483732 |2020-05-29 00:07:21 |54 |654|
* |68486103 |2020-06-01 00:37:38 |23 |234|
* |68486103 |2020-06-01 00:59:30 |12 |14 |
* |68486103 |2020-06-01 01:59:30 |54 |54 |
* +----------+--------------------+---+---+
*
* root
* |-- install_id: integer (nullable = true)
* |-- influencer_date_time: timestamp (nullable = true)
* |-- id1: integer (nullable = true)
* |-- id2: integer (nullable = true)
*/
在组中首次匹配 ID1 和 ID2 后删除行
val w = Window.partitionBy("install_id").orderBy("influencer_date_time")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn("new_col", min(when($"id1" === $"id2", $"influencer_date_time")).over(w))
.filter($"influencer_date_time".cast("long") - $"new_col".cast("long")<=0)
.show(false)
/**
* +----------+--------------------+---+---+-------------------+
* |install_id|influencer_date_time|id1|id2|new_col |
* +----------+--------------------+---+---+-------------------+
* |68483732 |2020-05-28 22:56:43 |21 |543|2020-05-29 00:03:21|
* |68483732 |2020-05-28 23:21:53 |35 |231|2020-05-29 00:03:21|
* |68483732 |2020-05-29 00:03:21 |23 |23 |2020-05-29 00:03:21|
* |68486103 |2020-06-01 00:37:38 |23 |234|2020-06-01 01:59:30|
* |68486103 |2020-06-01 00:59:30 |12 |14 |2020-06-01 01:59:30|
* |68486103 |2020-06-01 01:59:30 |54 |54 |2020-06-01 01:59:30|
* +----------+--------------------+---+---+-------------------+
*/
删除组中最后一个匹配的 ID1 和 ID2 之后的行
// drop rows after last matching id1 and id2 in a group
df.withColumn("new_col", max(when($"id1" === $"id2", $"influencer_date_time")).over(w))
.filter($"influencer_date_time".cast("long") - $"new_col".cast("long")<=0)
.show(false)
/**
* +----------+--------------------+---+---+-------------------+
* |install_id|influencer_date_time|id1|id2|new_col |
* +----------+--------------------+---+---+-------------------+
* |68483732 |2020-05-28 22:56:43 |21 |543|2020-05-29 00:06:21|
* |68483732 |2020-05-28 23:21:53 |35 |231|2020-05-29 00:06:21|
* |68483732 |2020-05-29 00:03:21 |23 |23 |2020-05-29 00:06:21|
* |68483732 |2020-05-29 00:05:21 |54 |654|2020-05-29 00:06:21|
* |68483732 |2020-05-29 00:06:21 |12 |12 |2020-05-29 00:06:21|
* |68486103 |2020-06-01 00:37:38 |23 |234|2020-06-01 01:59:30|
* |68486103 |2020-06-01 00:59:30 |12 |14 |2020-06-01 01:59:30|
* |68486103 |2020-06-01 01:59:30 |54 |54 |2020-06-01 01:59:30|
* +----------+--------------------+---+---+-------------------+
*/
比较后尝试使用窗口的滞后函数
import pyspark.sql.functions as F
from pyspark.sql.window import Window
#Test data
tst = sqlContext.createDataFrame([(1,2,3,4),(1,3,4,1),(1,4,5,5),(1,6,7,8),(2,1,9,2),(2,2,9,9)],schema=['col1','col2','id1','id2'])
w = Window.partitionBy('col1').orderBy('col2')
tst_cmp = tst.withColumn("cmp",F.when(F.col('id1')==F.col('id2'),1).otherwise(0))
#%%
tst_flag = tst_cmp.withColumn("flag", F.sum(F.lag('cmp',default=0).over(w)).over(w))
tst_res = tst_flag.where(F.col('flag')==0)