我们需要使用Pyspark将基于ID的多行合并为单个记录。如果对列有多个更新,那么我们必须选择对其进行了最后更新的列。请注意,NULL表示没有对该实例中的列进行更新。基本上,我们需要创建一个单行,包含对记录的合并更新。因此,例如,如果这是数据框架…
寻找类似的答案,但在Pyspark ..合并spark scala Dataframe中的行
------------------------------------------------------------
| id | column1 | column2 | updated_at |
------------------------------------------------------------
| 123 | update1 | <*no-update*> | 1634228709 |
| 123 | <*no-update*> | 80 | 1634228724 |
| 123 | update2 | <*no-update*> | 1634229000 |
预期输出为-
------------------------------------------------------------
| id | column1 | column2 | updated_at |
------------------------------------------------------------
| 123 | update2 | 80 | 1634229000 |
假设我们的输入数据框是:
+---+-------+----+----------+
|id |col1 |col2|updated_at|
+---+-------+----+----------+
|123|null |null|1634228709|
|123|null |80 |1634228724|
|123|update2|90 |1634229000|
|12 |update1|null|1634221233|
|12 |null |80 |1634228333|
|12 |update2|null|1634221220|
+---+-------+----+----------+
我们想要的是将updated_at
转换为TimestampType
,然后按desc
的顺序将id
和updated_at
排序:
df = df.withColumn("updated_at", F.col("updated_at").cast(TimestampType())).orderBy(
F.col("id"), F.col("updated_at").desc()
)
得到:
+---+-------+----+-------------------+
|id |col1 |col2|updated_at |
+---+-------+----+-------------------+
|12 |null |80 |2021-10-14 18:18:53|
|12 |update1|null|2021-10-14 16:20:33|
|12 |update2|null|2021-10-14 16:20:20|
|123|update2|90 |2021-10-14 18:30:00|
|123|null |80 |2021-10-14 18:25:24|
|123|null |null|2021-10-14 18:25:09|
+---+-------+----+-------------------+
现在在每列中获得第一个非None
值或返回None
并按id
分组:
exp = [F.first(x, ignorenulls=True).alias(x) for x in df.columns[1:]]
df = df.groupBy(F.col("id")).agg(*exp)
结果是:
+---+-------+----+-------------------+
|id |col1 |col2|updated_at |
+---+-------+----+-------------------+
|123|update2|90 |2021-10-14 18:30:00|
|12 |update1|80 |2021-10-14 18:18:53|
+---+-------+----+-------------------+
下面是完整的示例代码:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import TimestampType
if __name__ == "__main__":
spark = SparkSession.builder.master("local").appName("Test").getOrCreate()
data = [
(123, None, None, 1634228709),
(123, None, 80, 1634228724),
(123, "update2", 90, 1634229000),
(12, "update1", None, 1634221233),
(12, None, 80, 1634228333),
(12, "update2", None, 1634221220),
]
columns = ["id", "col1", "col2", "updated_at"]
df = spark.createDataFrame(data, columns)
df = df.withColumn("updated_at", F.col("updated_at").cast(TimestampType())).orderBy(
F.col("id"), F.col("updated_at").desc()
)
exp = [F.first(x, ignorenulls=True).alias(x) for x in df.columns[1:]]
df = df.groupBy(F.col("id")).agg(*exp)