根据update_time将dataframe内的多个spark行按ID合并为一行



我们需要使用Pyspark将基于ID的多行合并为单个记录。如果对列有多个更新,那么我们必须选择对其进行了最后更新的列。请注意,NULL表示没有对该实例中的列进行更新。基本上,我们需要创建一个单行,包含对记录的合并更新。因此,例如,如果这是数据框架…

寻找类似的答案,但在Pyspark ..合并spark scala Dataframe中的行

------------------------------------------------------------
| id       | column1          | column2         | updated_at |
------------------------------------------------------------
| 123      | update1          | <*no-update*>   | 1634228709 |   
| 123      | <*no-update*>    | 80              | 1634228724 |
| 123      | update2          | <*no-update*>   | 1634229000 |

预期输出为-

------------------------------------------------------------
| id       | column1          | column2       | updated_at |
------------------------------------------------------------
| 123      | update2          | 80            | 1634229000 |

假设我们的输入数据框是:

+---+-------+----+----------+
|id |col1   |col2|updated_at|
+---+-------+----+----------+
|123|null   |null|1634228709|
|123|null   |80  |1634228724|
|123|update2|90  |1634229000|
|12 |update1|null|1634221233|
|12 |null   |80  |1634228333|
|12 |update2|null|1634221220|
+---+-------+----+----------+

我们想要的是将updated_at转换为TimestampType,然后按desc的顺序将idupdated_at排序:

df = df.withColumn("updated_at", F.col("updated_at").cast(TimestampType())).orderBy(
F.col("id"), F.col("updated_at").desc()
)

得到:

+---+-------+----+-------------------+
|id |col1   |col2|updated_at         |
+---+-------+----+-------------------+
|12 |null   |80  |2021-10-14 18:18:53|
|12 |update1|null|2021-10-14 16:20:33|
|12 |update2|null|2021-10-14 16:20:20|
|123|update2|90  |2021-10-14 18:30:00|
|123|null   |80  |2021-10-14 18:25:24|
|123|null   |null|2021-10-14 18:25:09|
+---+-------+----+-------------------+

现在在每列中获得第一个非None值或返回None并按id分组:

exp = [F.first(x, ignorenulls=True).alias(x) for x in df.columns[1:]]
df = df.groupBy(F.col("id")).agg(*exp)

结果是:

+---+-------+----+-------------------+
|id |col1   |col2|updated_at         |
+---+-------+----+-------------------+
|123|update2|90  |2021-10-14 18:30:00|
|12 |update1|80  |2021-10-14 18:18:53|
+---+-------+----+-------------------+
下面是完整的示例代码:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import TimestampType
if __name__ == "__main__":
spark = SparkSession.builder.master("local").appName("Test").getOrCreate()
data = [
(123, None, None, 1634228709),
(123, None, 80, 1634228724),
(123, "update2", 90, 1634229000),
(12, "update1", None, 1634221233),
(12, None, 80, 1634228333),
(12, "update2", None, 1634221220),
]
columns = ["id", "col1", "col2", "updated_at"]
df = spark.createDataFrame(data, columns)
df = df.withColumn("updated_at", F.col("updated_at").cast(TimestampType())).orderBy(
F.col("id"), F.col("updated_at").desc()
)
exp = [F.first(x, ignorenulls=True).alias(x) for x in df.columns[1:]]
df = df.groupBy(F.col("id")).agg(*exp)