我有一个Dataframe,我想删除基于一个键的重复项,捕获是在记录共享相同的键,我需要选择基于一些列,而不仅仅是任何记录。
例如,我的DF看起来像:+------+-------+-----+
|animal|country|color|
+------+-------+-----+
| Cat|america|white|
| dog|america|brown|
| dog| canada|white|
| dog| canada|black|
| Cat| canada|black|
| bear| canada|white|
+------+-------+-----+
现在我想做删除重复基于列动物,然后选择那些有国家 '美国'。
我期望的输出应该是:
+------+-------+-----+
|animal|country|color|
+------+-------+-----+
| Cat|america|white|
| dog|america|brown|
| bear| canada|white|
+------+-------+-----+
由于Dataframe api中没有reduceBykey,我将其转换为keyValue对rdd,然后执行reduceBykey,我被困在函数中,该函数将在重复项中进行基于偏好的选择。
我更喜欢scala的示例代码
如果所有动物(狗,猫,…)至少有一个国家"america"的条目,并且您不介意在美国丢失重复匹配的动物,则可以使用reduceByKey
:
val animals = sc.parallelize(("cat","america", "white")::("dog","america","brown")::("dog","canada","white")::("dog","canada","black")::("cat","canada","black")::("bear","canada","white")::Nil)
val animalsKV = animals.map { case (k, a, b) => k -> (a,b) }
animalsKV.reduceByKey {
case (a @ ("america",_ ), _) => a
case (_, b) => b
}
如果您的动物在"america"中没有条目,则上面的代码将取其中一个重复项:最后一个。您可以通过在这些情况下提供维护副本的结果来改进它。例句:
animalsKV.combineByKey(
Map(_), // First met entries keep wrapped within a map from their country to their color
(m: Map[String, String], entry: (String, String)) =>
if(entry._1 == "america") Map(entry) // If one animal in "america" is found, then it should be the answer value.
else m + entry, //Otherwise, we keep track of the duplicates
(a: Map[String, String], b: Map[String, String]) => //When combining maps...
if(a contains "america") a // If one of them contains "america"
else if(b contains "america") b //... then we keep that map
else a ++ b // Otherwise, we accumulate the duplicates
)
该代码也可以修改以跟踪重复的"美国"动物。
我相信你可以在spark版本>= 1.4中使用windows(至少我认为它们是这样称呼的)完成你的dataframe。
但是使用rdd
val input: RDD[(String, String, Row)] = ???
val keyedByAnimal: RDD[(String, (String, Row))] =
input.map{case (animal, country, other) => (animal, (country, other)) }
val result: RDD[(String, (String, Row))] = keyedByAnimal.reduceByKey{(x, y) =>
if(x._1 == "america") x else y
}
上面为每个动物值提供了一个不同的值。选择哪个值是不确定的。我们所能说的是,如果存在一个带有"america"的动物值,其中一个将被选中。
关于你的评论:
val df: DataFrame = ???
val animalCol:String = ???
val countryCol: String = ???
val otherCols = df.columns.filter(c => c != animalCol && c!= countryCol)
val rdd: RDD[(String, String, Row)] =
df.select(animalCol, countryCol, otherCols:_ *).rdd.map(r => (r.getString(0), r.getString(1), r)
select对列重新排序,以便getString方法提取期望的值。老实说,看看Window aggreations。我不太了解他们,因为我不使用Dataframes或Spark超过1.3。