包含以下列的数据帧:
<表类>
两个
3
四
tbody><<tr>3 ? Jaun 3.47 3 164 Jaun 3.47 1? ? 2.68 3 164 Kaul ? 1? ? 2.68 表类>
下面的代码将产生所需的结果,但可能需要进行一些优化。
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
val inDF = // Read data
inDF
.withColumn("two", when('two === "?",
mean(when('two === "?", 0).otherwise('two)).over()).otherwise('two))
.withColumn("four", when('four === "?",
mean(when('four === "?", 0).otherwise('four)).over()).otherwise('four))
.withColumn("no_occurrence", count("*").over(Window.partitionBy("three")))
.withColumn("max_occurrence", when('three =!= "?", max('no_occurrence).over()).otherwise(0))
.withColumn("replacement", max(when('no_occurrence === 'max_occurrence, 'three)).over())
.withColumn("three", when('three === "?", 'replacement).otherwise('three))
.drop("no_occurrence", "max_occurrence", "replacement")
.show(false)
+---+----+-----+----+
|one|two |three|four|
+---+----+-----+----+
|3 |65.6|Jaun |3.47|
|3 |164 |Jaun |3.47|
|3 |164 |Kaul |2.46|
|1 |65.6|Jaun |2.68|
|1 |65.6|Jaun |2.68|
+---+----+-----+----+
这个模式有点棘手,因为SparkSQL不支持mode()
或类似的功能。但是你可以使用窗口函数:
select t.*,
coalesce(two, two_avg),
coalesce(three,
max(case when three_cnt = max_three_cnt then three end) over ()
),
coalesce(four, four_avg)
from (select t.*,
max(three_cnt) over () as max_three_cnt
from (select t.*,
avg(two) over () as two_avg,
count(*) over (partition by three) as three_cnt,
avg(four) over () as four_avg
from t
) t
) t;