使用聚合功能清理缺失值火花



我想通过用平均值替换它们来清理缺失的值。这个源代码以前工作我不知道为什么,它现在不起作用。任何帮助将不胜感激。 这是我使用的数据集

RowNumber,Poids,Age,Taille,0MI,Hmean,CoocParam,LdpParam,Test2,Classe
0,,72,160,5,,2.9421,,3,4
1,54,70,,5,0.6301,2.7273,,3,
2,,51,164,5,,2.9834,,3,4
3,,74,170,5,0.6966,2.9654,2.3699,3,4
4,108,62,,5,0.6087,2.7093,2.1619,3,4

在这里我做了什么

val spark = SparkSession.builder.master("local").appName("my-spark-app").getOrCreate()
val df = spark.read.option("header", true).option("inferSchema", true).format("com.databricks.spark.csv").load("C:/Users/mhattabi/Desktop/data_with_missing_values3.csv")
df.show(false)
var newDF = df
df.dtypes.foreach { x =>
val colName = x._1
newDF = newDF.na.fill(df.agg(max(colName)).first()(0).toString, Seq(colName))
}
newDF.show(false)

这是结果,什么也没发生

initial_data
+---------+-----+---+------+---+------+---------+--------+-----+------+
|RowNumber|Poids|Age|Taille|0MI|Hmean |CoocParam|LdpParam|Test2|Classe|
+---------+-----+---+------+---+------+---------+--------+-----+------+
|0        |null |72 |160   |5  |null  |2.9421   |null    |3    |4     |
|1        |54   |70 |null  |5  |0.6301|2.7273   |null    |3    |null  |
|2        |null |51 |164   |5  |null  |2.9834   |null    |3    |4     |
|3        |null |74 |170   |5  |0.6966|2.9654   |2.3699  |3    |4     |
|4        |108  |62 |null  |5  |0.6087|2.7093   |2.1619  |3    |4     |
+---------+-----+---+------+---+------+---------+--------+-----+------+
new_data
+---------+-----+---+------+---+------+---------+--------+-----+------+
|RowNumber|Poids|Age|Taille|0MI|Hmean |CoocParam|LdpParam|Test2|Classe|
+---------+-----+---+------+---+------+---------+--------+-----+------+
|0        |null |72 |160   |5  |null  |2.9421   |null    |3    |4     |
|1        |54   |70 |null  |5  |0.6301|2.7273   |null    |3    |null  |
|2        |null |51 |164   |5  |null  |2.9834   |null    |3    |4     |
|3        |null |74 |170   |5  |0.6966|2.9654   |2.3699  |3    |4     |
|4        |108  |62 |null  |5  |0.6087|2.7093   |2.1619  |3    |4     |
+---------+-----+---+------+---+------+---------+--------+-----+------+

我该怎么办

您可以使用withColumnapi 并使用when函数检查columns中的空值为

df.dtypes.foreach { x =>
val colName = x._1
val fill = df.agg(max(col(s"`$colName`"))).first()(0).toString
newDF = newDF.withColumn(colName, when(col(s"`$colName`").isNull , fill).otherwise(col(s"`$colName`")) )
}
newDF.show(false)

我希望这可以解决您的问题

如果您尝试用平均值替换null值,则计算meanfill

import org.apache.spark.sql.functions.mean

val data = spark.read.option("header", true)
.option("inferSchema", true).format("com.databricks.spark.csv")
.load("data.csv")
//Calculate the mean for each column and create a map with its column name 
//and use na.fill() method to replace null with that mean
data.na.fill(data.columns.zip(
data.select(data.columns.map(mean(_)): _*).first.toSeq
).toMap)

我已经在本地测试了代码并且工作正常。

输出:

+---------+-----+---+------+---+------------------+---------+------------------+-----+------+
|RowNumber|Poids|Age|Taille|0MI|             Hmean|CoocParam|          LdpParam|Test2|Classe|
+---------+-----+---+------+---+------------------+---------+------------------+-----+------+
|        0|   81| 72|   160|  5|0.6451333333333333|   2.9421|2.2659000000000002|    3|     4|
|        1|   54| 70|   164|  5|            0.6301|   2.7273|2.2659000000000002|    3|     4|
|        2|   81| 51|   164|  5|0.6451333333333333|   2.9834|2.2659000000000002|    3|     4|
|        3|   81| 74|   170|  5|            0.6966|   2.9654|            2.3699|    3|     4|
|        4|  108| 62|   164|  5|            0.6087|   2.7093|            2.1619|    3|     4|
+---------+-----+---+------+---+------------------+---------+------------------+-----+------+

希望这有帮助!

这应该可以:

var imputeDF = df
df.dtypes.foreach { x => 
val colName = x._1
newDF = newDF.na.fill(df.agg(max(colName)).first()(0).toString , Seq(colName)) }

请注意,将可变数据类型与 scala 一起使用不是一个好的做法。

根据您的数据,您可以使用 SQL 联接或其他方式将 null 替换为更合适的值。

最新更新