如何获取列中最常见的非空值?



我有以下数据帧df

+-------------------+--------+--------------------+
|   id|         name|    type|                 url|
+-------------------+--------+--------------------+
|    1|      NT Note|    aaaa|                null|
|    1|      NT Note|    aaaa|http://www.teleab...|
|    1|      NT Note|    aaaa|http://www.teleab...|
|    1|      NT Note|    aaaa|                null|
|    1|      NT Note|    aaaa|                null|
|    2|          ABC|    bbbb|                null|
|    2|          ABC|    bbbb|                null|
|    2|          ABC|    bbbb|                null|
|    2|          ABC|    bbbb|                null|
+-------------------+--------+--------------------+

我为每个节点分配了最常用的urltype值:

def windowSpec = Window.partitionBy("id", "url", "type") 
val result = df.withColumn("count", count("url").over(windowSpec))  
.orderBy($"count".desc)                                                                                 
.groupBy("id")                                                                                     
.agg(
first("url").as("URL"),
first("type").as("Typel")
)

但实际上,我需要优先考虑最常见的非空url,以获得以下结果:

+-------------------+--------+--------------------+
|   id|         name|    type|                 url|
+-------------------+--------+--------------------+
|    1|      NT Note|    aaaa|http://www.teleab...|
|    2|          ABC|    bbbb|                null|
+-------------------+--------+--------------------+

现在我得到如下所示的输出,因为记录 ID1null更频繁:

+-------------------+--------+--------------------+
|   id|         name|    type|                 url|
+-------------------+--------+--------------------+
|    1|      NT Note|    aaaa|                null|
|    2|          ABC|    bbbb|                null|
+-------------------+--------+--------------------+

您可以使用如下所示udf

执行此操作
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
//function to return most frequent url
def mfnURL(arr: WrappedArray[String]): String = {
val filterArr = arr.filterNot(_ == null)
if (filterArr.length == 0)
return null
else {
filterArr.groupBy(identity).maxBy(_._2.size)._1
}
}
//registering udf mfnURL
val mfnURLUDF = udf(mfnURL _)
//applying groupby , agg and udf
df.groupBy("id", "name", "type").agg(mfnURLUDF(collect_list("url")).alias("url")).show
//Sample output
+---+-------+----+--------------------+
| id|   name|type|                 url|
+---+-------+----+--------------------+
|  2|    ABC|bbbb|                null|
|  1|NT Note|aaaa|http://www.teleab...|
+---+-------+----+--------------------+

最新更新