我需要清理一个数据集,只过滤基于某些字段(在下面的示例中,我们只考虑每个id的城市和体育)的修改行(与前一个相比),只保留第一个出现的行。如果一行返回到之前的状态(但不是之前的状态),我仍然希望保留它。
Input df1
<表类>
id
城市体育日期 tbody><<tr>abc 伦敦 足球 2022-02-11 abc巴黎 足球 2022-02-12 abc巴黎 足球 2022-02-13 abc巴黎 足球 2022-02-14 abc巴黎 足球 2022-02-15 abc伦敦 足球 2022-02-16 abc巴黎 足球 2022-02-17 def 巴黎 凌空 2022-02-10 def 巴黎 凌空 2022-02-11 ghi 曼彻斯特 篮球 2022-02-09 表类>
我将简单地使用滞后函数来比较哈希值:
from pyspark.sql import functions as F, Window
output_df = (
df.withColumn("hash", F.hash(F.col("city"), F.col("sport")))
.withColumn(
"prev_hash", F.lag("hash").over(Window.partitionBy("id").orderBy("date"))
)
.where(~F.col("hash").eqNullSafe(F.col("prev_hash")))
.drop("hash", "prev_hash")
)
output_df.show()
+---+----------+----------+----------+
| id| city| sport| date|
+---+----------+----------+----------+
|abc| london| football|2022-02-11|
|abc| paris| football|2022-02-12|
|abc| london| football|2022-02-16|
|abc| paris| football|2022-02-17|
|def| paris| volley|2022-02-10|
|ghi|manchester|basketball|2022-02-09|
+---+----------+----------+----------+
虽然下面的解决方案适用于给定的数据,但有两件事:
- Spark的架构不适合这样的串行处理。
- 正如我在评论中指出的那样,你必须有一个关键属性或属性组合,如果数据碎片化,可以将数据恢复有序。分区和碎片的一个微小变化可能会改变结果。
逻辑是:
- 转变"city"one_answers";sport">
- 与本行的"城市"比较和";sport"这些移位的值。如果你看到不同,那就是一个新行。对于相似的行,没有区别。为此,我们使用Spark的窗口util和"dummy_serial_key"
- 过滤符合上述条件的数据。
您可以根据您的数据设计随意添加更多列:
from pyspark.sql.window import Window
df = spark.createDataFrame(data=[["abc","london","football","2022-02-11"],["abc","paris","football","2022-02-12"],["abc","paris","football","2022-02-13"],["abc","paris","football","2022-02-14"],["abc","paris","football","2022-02-15"],["abc","london","football","2022-02-16"],["abc","paris","football","2022-02-17"],["def","paris","volley","2022-02-10"],["def","paris","volley","2022-02-11"],["ghi","manchester","basketball","2022-02-09"]], schema=["id","city","sport","date"])
df = df.withColumn("date", F.to_date("date", format="yyyy-MM-dd"))
df = df.withColumn("dummy_serial_key", F.lit(0))
dummy_w = Window.partitionBy("dummy_serial_key").orderBy("dummy_serial_key")
df = df.withColumn("city_prev", F.lag("city", offset=1).over(dummy_w))
df = df.withColumn("sport_prev", F.lag("sport", offset=1).over(dummy_w))
df = df.filter(
(F.col("city_prev").isNull())
| (F.col("sport_prev").isNull())
| (F.col("city") != F.col("city_prev"))
| (F.col("sport") != F.col("sport_prev"))
)
df = df.drop("dummy_serial_key", "city_prev", "sport_prev")
+---+----------+----------+----------+
| id| city| sport| date|
+---+----------+----------+----------+
|abc| london| football|2022-02-11|
|abc| paris| football|2022-02-12|
|abc| london| football|2022-02-16|
|abc| paris| football|2022-02-17|
|def| paris| volley|2022-02-10|
|ghi|manchester|basketball|2022-02-09|
+---+----------+----------+----------+