合并行,为每对保留最新值(元素、时间戳)



我有一个PySpark数据帧,其中包含id、值及其相关的时间戳,这里是它的示例:

+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value|           value_ts|value2|          value2_ts|value3|          value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
|  1|  0.5|2022-03-15 00:00:00|  null|               null|     7|2022-03-15 00:00:00|
|  2|  0.2|2022-03-18 00:00:00|  null|               null|     5|2022-03-18 00:00:00|
|  3| null|               null|  null|               null|    12|2022-03-15 00:00:00|
|  1|  1.2|2022-03-18 00:00:00|  null|               null|  null|               null|
|  1| null|               null|   124|2022-03-10 00:00:00|     6|2022-03-10 00:00:00|
|  3| null|               null|   413|2022-03-18 00:00:00|  null|               null|
+---+-----+-------------------+------+-------------------+------+-------------------+

对于这些数据,我想获得每对value-value_ts的最新值。按id分组。在这个例子中,我们有:

  • id=1
    • 最新值_ts为2022-03-18 00:00:00,对应值为1.2
    • 最新值2_ts为2021-03-10 00:00:00,对应的值2为124
    • 最新值3_ts为2022-03-15 00:00:00,对应的值3为7
  • id=2
    • 最新值_ts为2022-03-18 00:00:00,对应值为0.2
    • 最新的value2_ts为null,对应的value2为null
    • 最新值3_ts为2022-03-18 00:00:00,对应的值3为5
  • id=3
    • 最新值_ts为null,对应值为null
    • 最新值2_ts为2022-03-18 00:00:00,对应的值2为413
    • 最新值3_ts为2022-03-15 00:00:00,对应的值3为12

因为我在输入中有3个不同的id,所以我希望输出中有3行,如下所示:

+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value|           value_ts|value2|          value2_ts|value3|          value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
|  1|  1.2|2022-03-18 00:00:00|   124|2022-03-10 00:00:00|     7|2022-03-15 00:00:00|
|  2|  0.2|2022-03-18 00:00:00|  null|               null|     5|2022-03-18 00:00:00|
|  3| null|               null|   413|2022-03-18 00:00:00|    12|2022-03-15 00:00:00|
+---+-----+-------------------+------+-------------------+------+-------------------+

你能帮我用pyspark获得这个结果吗?

注意,如果ts为null,则相应的值也为null,这里是python代码来重现输入数据帧:

from datetime import datetime
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("local_test").getOrCreate()
df = spark.createDataFrame([
["1", "0.5", datetime(2022, 3, 15), None, None, "7", datetime(2022, 3, 15)],
["2", "0.2", datetime(2022, 3, 18), None, None, "5", datetime(2022, 3, 18)],
["3", None, None, None, None, "12", datetime(2022, 3, 15)],
["1", "1.2", datetime(2022, 3, 18), None, None, None, None],
["1", None, None, "124", datetime(2022, 3, 10), "6", datetime(2022, 3, 10)],
["3", None, None, "413", datetime(2022, 3, 18), None, None],
],
["id", "value", "value_ts", "value2", "value2_ts", "value3", "value3_ts"]
)

这可以通过以下方式解决:

  1. 识别每个valuets列组合的latest ts
  2. 将步骤1中识别的值复制到属于同一id的所有行
  3. 最后,通过查找每组中的第一行来消除重复
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql import DataFrame
def latest_ts(idf: DataFrame, val_col_name: str, ts_col_name: str) -> DataFrame:
ws = W.partitionBy("id").orderBy(F.desc(ts_col_name)).rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
# Find the latest ts
latest_ts = F.max(ts_col_name).over(ws)
# Keep value corresponding to the latest ts and make others null
latest_val = F.when(F.col(ts_col_name) == latest_ts, F.col(val_col_name)).otherwise(F.lit(None))
# Override all values of the value column and ts column to contain the latest values.
return idf.withColumn(val_col_name, F.first(latest_val, ignorenulls=True).over(ws)).withColumn(ts_col_name, latest_ts)
df_latest_ts = latest_ts(latest_ts(latest_ts(df, "value", "value_ts"), "value2", "value2_ts"), "value3", "value3_ts")

ws_rn = W.partitionBy("id").orderBy(F.desc("value_ts"), F.desc("value2_ts"), F.desc("value3_ts"))
(df_latest_ts.withColumn("rn", F.row_number().over(ws_rn))
.where("rn == 1")
.drop("rn")
).show()
"""
+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value|           value_ts|value2|          value2_ts|value3|          value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
|  1|  1.2|2022-03-18 00:00:00|   124|2022-03-10 00:00:00|     7|2022-03-15 00:00:00|
|  2|  0.2|2022-03-18 00:00:00|  null|               null|     5|2022-03-18 00:00:00|
|  3| null|               null|   413|2022-03-18 00:00:00|    12|2022-03-15 00:00:00|
+---+-----+-------------------+------+-------------------+------+-------------------+
"""

最新更新