我试图对我的数据帧执行一个非常具体的操作,但我找不到很好的方法。
我有一个看起来像这样的数据帧:
+------------------+----------------+--------+
|CIVILITY_PREDICTED|COUNTRY_CODE_PRE| name|
+------------------+----------------+--------+
| M| CA|A HANNAN|
| M| CA| A JAY|
| M| GB| A JAY|
| M| CA| A K I L|
| F| CA| A LAH|
| ?| CN| A LIAN|
| ?| CN| A MEI|
| ?| CN| A MIN|
| F| CA| A RIN|
| M| CA| A S M|
| ?| CN| A YING|
| F| CA|AA ISHAH|
| M| CA| AABAN|
| M| GB| AABAN|
| M| US| AABAN|
| M| GB| AABAS|
| F| CA| AABEER|
| M| CA| AABEL|
| F| US| AABHA|
| F| GB| AABIA|
+------------------+----------------+--------+
正如你在CIVILITY_PREDICTED中看到的,我有一些"&"。每";name";每个国家有一行,有时CIVILITY_PREDICTED是"代表一个国家,但不代表另一个同名国家。
所以基本上我想要每个"为名称添加基于其他国家的最常见的CIVILITY_PREDICTED。
我试着这样做(e是数据帧,to_predict是另一个只有我想要的名称的数据帧(:
e.filter($"CIVILITY_PREDICTED" === "?" && $"name".isNotNull)
.select("COUNTRY_CODE_PRE","CIVILITY_PREDICTED","name").
collect().map(a => {
to_predict
.filter($"name" === a.get(3))
.filter( $"CIVILITY_PREDICTED" !== "?")
.groupBy("CIVILITY_PREDICTED")
.count()
.agg(org.apache.spark.sql.functions.max("CIVILITY_PREDICTED")).show()
有了这个,我得到了每个名称出现次数最多的CIVLITY_PREDICTED,但我想它不是很理想,我不知道如何替换相应的"在这个数据帧中。
有人知道吗?非常感谢
窗口函数是这里的关键。以下解决方案使用first_value根据行数选择第一个性别值。
spark.sql("""select distinct name, first_value(CIVILITY_PREDICTED) over (partition by name order by count(*) desc) civility
from civ
group by name, CIVILITY_PREDICTED
""").show
根据如下所示重新创建的数据,这将返回:
+-----+--------+
| name|civility|
+-----+--------+
|AABAN| M|
+-----+--------+
查看原始值以及最常见的值:
spark.sql("""select name, CIVILITY_PREDICTED,
first(CIVILITY_PREDICTED)
over (partition by name order by count(*) desc) civility
from civ
group by 1,2
order by 1,2
""").show
返回
+-----+------------------+--------+
| name|CIVILITY_PREDICTED|civility|
+-----+------------------+--------+
|AABAN| ?| M|
|AABAN| M| M|
+-----+------------------+--------+
我只重新创建了一个名称,其中包含您试图解决的问题。AABAN是?用于一行,而'M'用于另外两行。
val civ = """+------------------+----------------+--------+
|CIVILITY_PREDICTED|COUNTRY_CODE_PRE| name|
+------------------+----------------+--------+
| ?| CA| AABAN|
| M| GB| AABAN|
| M| US| AABAN|""".stripMargin.replaceAll("\+", "").replaceAll("\-", "").split("n").filter(_.size>10)
val df = spark.read
.option("ignoreTrailingWhiteSpace", "true")
.option("ignoreLeadingWhiteSpace", "true")
.option("delimiter", "|")
.option("header", "true")
.csv(spark.sparkContext.parallelize(civ).toDS)
.drop("_c3")
df.createOrReplaceTempView("civ")
df.orderBy("name").show(99)
+------------------+----------------+-----+
|CIVILITY_PREDICTED|COUNTRY_CODE_PRE| name|
+------------------+----------------+-----+
| ?| CA|AABAN|
| M| GB|AABAN|
| M| US|AABAN|
+------------------+----------------+-----+