Pyspark通过在另一列中搜索相同的值来替换NA



Column_1的值不能在column_2上有多个值。因此,对于相同的Id,我们有相同的值。

column_1 column_2
52     A
78     B
52 

预计

column_1 column_2
52     A
78     B
52     A

这意味着在column_1中搜索与相同缺失的column_2 id匹配的第一个column_1值。

我有一个使用R的工作解决方案,但是使用pyspark我找不到类似的方法。

由于相同的ID将始终具有相同的值,如您所述

实现此目的的一种方法是使用数据中存在的固有序列顺序,并使用lag值填充缺失值

您可以使用Lag Function来生成与您的col_1和Coalesce关联的前一个值,以从两个

中获得第一个非空值。

数据准备

df = pd.DataFrame({
'col_1': [52,78,52,52,78,78],
'col_2': ['A','B',None,'A','B',None]
})
sparkDF = sql.createDataFrame(df)
sparkDF.show()
+-----+-----+
|col_1|col_2|
+-----+-----+
|   52|    A|
|   78|    B|
|   52| null|
|   52|    A|
|   78|    B|
|   78| null|
+-----+-----+

滞后
window = Window.partitionBy('col_1').orderBy(F.col('col_2').desc())

sparkDF = sparkDF.withColumn('col_2_lag',F.lag('col_2').over(window))

sparkDF.show()
+-----+-----+---------+
|col_1|col_2|col_2_lag|
+-----+-----+---------+
|   52|    A|     null|
|   52|    A|        A|
|   52| null|        A|
|   78|    B|     null|
|   78|    B|        B|
|   78| null|        B|
+-----+-----+---------+

合并

sparkDF  = sparkDF.withColumn('col_2',F.coalesce(F.col('col_2'),F.col('col_2_lag'))).drop('col_2_lag')

sparkDF.show()
+-----+-----+
|col_1|col_2|
+-----+-----+
|   52|    A|
|   52|    A|
|   52|    A|
|   78|    B|
|   78|    B|
|   78|    B|
+-----+-----+

我会这样做,使用max:

from pyspark.sql import functions as F, Window
df.withColumn(
"column_2",
F.coalesce(
F.col("column_2"), F.max("column_2").over(Window.partitionBy("column_1"))
),
).show()

最新更新