如何替换数据框架列有?符号的列的平均值在spark scala?



包含以下列的数据帧:

<表类> 两个 3 四 tbody><<tr>3?Jaun3.473164Jaun3.471??2.683164Kaul?1??2.68

下面的代码将产生所需的结果,但可能需要进行一些优化。

val spark = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
val inDF = // Read data
inDF
.withColumn("two", when('two === "?",
mean(when('two === "?", 0).otherwise('two)).over()).otherwise('two))
.withColumn("four", when('four === "?",
mean(when('four === "?", 0).otherwise('four)).over()).otherwise('four))
.withColumn("no_occurrence", count("*").over(Window.partitionBy("three")))
.withColumn("max_occurrence", when('three =!= "?", max('no_occurrence).over()).otherwise(0))
.withColumn("replacement", max(when('no_occurrence === 'max_occurrence, 'three)).over())
.withColumn("three", when('three === "?", 'replacement).otherwise('three))
.drop("no_occurrence", "max_occurrence", "replacement")
.show(false)
+---+----+-----+----+
|one|two |three|four|
+---+----+-----+----+
|3  |65.6|Jaun |3.47|
|3  |164 |Jaun |3.47|
|3  |164 |Kaul |2.46|
|1  |65.6|Jaun |2.68|
|1  |65.6|Jaun |2.68|
+---+----+-----+----+

这个模式有点棘手,因为SparkSQL不支持mode()或类似的功能。但是你可以使用窗口函数:

select t.*,
coalesce(two, two_avg),
coalesce(three,
max(case when three_cnt = max_three_cnt then three end) over ()
),
coalesce(four, four_avg)
from (select t.*,
max(three_cnt) over () as max_three_cnt
from (select t.*,
avg(two) over () as two_avg,
count(*) over (partition by three) as three_cnt,
avg(four) over () as four_avg
from t
) t
) t;

最新更新