在Pyspark中只保留修改过的行



我需要清理一个数据集,只过滤基于某些字段(在下面的示例中,我们只考虑每个id的城市和体育)的修改行(与前一个相比),只保留第一个出现的行。如果一行返回到之前的状态(但不是之前的状态),我仍然希望保留它。

Input df1

<表类> id 城市体育日期tbody><<tr>abc伦敦足球2022-02-11abc巴黎足球2022-02-12abc巴黎足球2022-02-13abc巴黎足球2022-02-14abc巴黎足球2022-02-15abc伦敦足球2022-02-16abc巴黎足球2022-02-17def巴黎凌空2022-02-10def巴黎凌空2022-02-11ghi曼彻斯特篮球2022-02-09

我将简单地使用滞后函数来比较哈希值:

from pyspark.sql import functions as F, Window
output_df = (
df.withColumn("hash", F.hash(F.col("city"), F.col("sport")))
.withColumn(
"prev_hash", F.lag("hash").over(Window.partitionBy("id").orderBy("date"))
)
.where(~F.col("hash").eqNullSafe(F.col("prev_hash")))
.drop("hash", "prev_hash")
)

output_df.show()
+---+----------+----------+----------+
| id|      city|     sport|      date|
+---+----------+----------+----------+
|abc|    london|  football|2022-02-11|
|abc|     paris|  football|2022-02-12|
|abc|    london|  football|2022-02-16|
|abc|     paris|  football|2022-02-17|
|def|     paris|    volley|2022-02-10|
|ghi|manchester|basketball|2022-02-09|
+---+----------+----------+----------+

虽然下面的解决方案适用于给定的数据,但有两件事:

  • Spark的架构不适合这样的串行处理。
  • 正如我在评论中指出的那样,你必须有一个关键属性或属性组合,如果数据碎片化,可以将数据恢复有序。分区和碎片的一个微小变化可能会改变结果。

逻辑是:

  • 转变"city"one_answers";sport">
  • 与本行的"城市"比较和";sport"这些移位的值。如果你看到不同,那就是一个新行。对于相似的行,没有区别。为此,我们使用Spark的窗口util和"dummy_serial_key"
  • 过滤符合上述条件的数据。

您可以根据您的数据设计随意添加更多列:

from pyspark.sql.window import Window
df = spark.createDataFrame(data=[["abc","london","football","2022-02-11"],["abc","paris","football","2022-02-12"],["abc","paris","football","2022-02-13"],["abc","paris","football","2022-02-14"],["abc","paris","football","2022-02-15"],["abc","london","football","2022-02-16"],["abc","paris","football","2022-02-17"],["def","paris","volley","2022-02-10"],["def","paris","volley","2022-02-11"],["ghi","manchester","basketball","2022-02-09"]], schema=["id","city","sport","date"])
df = df.withColumn("date", F.to_date("date", format="yyyy-MM-dd"))
df = df.withColumn("dummy_serial_key", F.lit(0))
dummy_w = Window.partitionBy("dummy_serial_key").orderBy("dummy_serial_key")
df = df.withColumn("city_prev", F.lag("city", offset=1).over(dummy_w))
df = df.withColumn("sport_prev", F.lag("sport", offset=1).over(dummy_w))
df = df.filter(
(F.col("city_prev").isNull())
| (F.col("sport_prev").isNull())
| (F.col("city") != F.col("city_prev"))
| (F.col("sport") != F.col("sport_prev"))
)
df = df.drop("dummy_serial_key", "city_prev", "sport_prev")
+---+----------+----------+----------+
| id|      city|     sport|      date|
+---+----------+----------+----------+
|abc|    london|  football|2022-02-11|
|abc|     paris|  football|2022-02-12|
|abc|    london|  football|2022-02-16|
|abc|     paris|  football|2022-02-17|
|def|     paris|    volley|2022-02-10|
|ghi|manchester|basketball|2022-02-09|
+---+----------+----------+----------+

最新更新