Pyspark 数据帧插补 -- 根据指定条件将未知值和缺失值替换为列平均值



给定一个Spark数据帧,我想根据该列的非缺失和非未知值来计算列平均值。然后我想采取这种手段,并用它来替换列中缺失的&未知值。

例如,假设我使用的是:

  • 名为df的数据帧,其中每条记录代表一个单独的列,所有列都是整数或数字
  • 列命名年龄(每条记录的年龄)
  • 名为missing_age的列(如果该个人没有年龄,则等于1,否则等于0)
  • 名为unknown_age的列(如果个人年龄未知,则等于1,否则为0)

然后我可以计算这个平均值,如下所示。

calc_mean = df.where((col("unknown_age") == 0) & (col("missing_age") == 0))
.agg(avg(col("age")))

或通过SQL和windows函数,

mean_compute = hiveContext.sql("select avg(age) over() as mean from df 
where missing_age = 0 and unknown_age = 0")

如果可以的话,我不想使用SQL/windows函数。我的挑战是接受这一方法,并使用非SQL方法替换未知/缺失的值。

我尝试过使用when()、where()、replace()、withColumn、UDF和组合。。。不管我做什么,我要么会出错,要么结果不是我所期望的。这是我尝试过的许多不起作用的事情中的一个例子。

imputed = df.when((col("unknown_age") == 1) | (col("missing_age") == 1),
calc_mean).otherwise("age")

我在网上搜索过,但没有发现类似的插补类型的问题,所以非常感谢任何帮助。这可能是我错过的非常简单的事情。

附带说明——我正在尝试将此代码应用于Spark Dataframe中列名中没有unknown_或missing_的所有列。我可以将Spark相关的代码包装在Python的"for循环"中,然后循环所有适用的列来完成这项工作吗?

更新:

还了解了如何在列中循环。。。下面是一个例子。

for x in df.columns:
if 'unknown_' not in x and 'missing_' not in x:
avg_compute = df.where(df['missing_' + x] != 1).agg(avg(x)).first()[0]
df = df.withColumn(x + 'mean_miss_imp', when((df['missing_' + x] == 1), 
avg_compute).otherwise(df[x]))

如果未知或缺失的年龄是某个值:

from pyspark.sql.functions import col, avg, when
df = sc.parallelize([
(10, 0, 0), (20, 0, 0), (-1, 1, 0), (-1, 0, 1)
]).toDF(["age", "missing_age", "unknown_age"])
avg_age = df.where(
(col("unknown_age") != 1) & (col("missing_age") != 1)
).agg(avg("age")).first()[0]
df.withColumn("age_imp", when(
(col("unknown_age") == 1) | (col("missing_age") == 1), avg_age
).otherwise(col("age")))

如果未知或丢失的年龄为NULL,您可以将其简化为:

df = sc.parallelize([
(10, 0, 0), (20, 0, 0), (None, 1, 0), (None, 0, 1)
]).toDF(["age", "missing_age", "unknown_age"])
df.na.fill(df.na.drop().agg(avg("age")).first()[0], ["age"])

相关内容

  • 没有找到相关文章

最新更新