如何使用reduceByKey选择特定列上的记录



我有一个Dataframe,我想删除基于一个键的重复项,捕获是在记录共享相同的键,我需要选择基于一些列,而不仅仅是任何记录。

例如,我的DF看起来像:
+------+-------+-----+
|animal|country|color|
+------+-------+-----+
|   Cat|america|white|
|   dog|america|brown|
|   dog| canada|white|
|   dog| canada|black|
|   Cat| canada|black|
|  bear| canada|white|
+------+-------+-----+

现在我想做删除重复基于列动物,然后选择那些有国家 '美国'。

我期望的输出应该是:

+------+-------+-----+
|animal|country|color|
+------+-------+-----+
|   Cat|america|white|
|   dog|america|brown|
|  bear| canada|white|
+------+-------+-----+

由于Dataframe api中没有reduceBykey,我将其转换为keyValue对rdd,然后执行reduceBykey,我被困在函数中,该函数将在重复项中进行基于偏好的选择。

我更喜欢scala的示例代码

如果所有动物(狗,猫,…)至少有一个国家"america"的条目,并且您不介意在美国丢失重复匹配的动物,则可以使用reduceByKey:

val animals = sc.parallelize(("cat","america", "white")::("dog","america","brown")::("dog","canada","white")::("dog","canada","black")::("cat","canada","black")::("bear","canada","white")::Nil)
val animalsKV = animals.map { case (k, a, b) => k -> (a,b) }
animalsKV.reduceByKey {
    case (a @ ("america",_ ), _) =>  a
    case (_, b) => b
}

如果您的动物在"america"中没有条目,则上面的代码将取其中一个重复项:最后一个。您可以通过在这些情况下提供维护副本的结果来改进它。例句:

animalsKV.combineByKey(
    Map(_), // First met entries keep wrapped within a map from their country to their color
    (m: Map[String, String], entry: (String, String)) => 
        if(entry._1 == "america") Map(entry) // If one animal in "america" is found, then it should be the answer value.
        else m + entry, //Otherwise, we keep track of the duplicates
    (a: Map[String, String], b: Map[String, String]) => //When combining maps...
        if(a contains "america") a // If one of them contains "america"
        else if(b contains "america") b //... then we keep that map
        else a ++ b // Otherwise, we accumulate the duplicates
)

该代码也可以修改以跟踪重复的"美国"动物。

我相信你可以在spark版本>= 1.4中使用windows(至少我认为它们是这样称呼的)完成你的dataframe。

但是使用rdd

val input: RDD[(String, String, Row)] = ???
val keyedByAnimal: RDD[(String, (String, Row))] = 
    input.map{case (animal, country, other) => (animal, (country, other)) }
val result: RDD[(String, (String, Row))] = keyedByAnimal.reduceByKey{(x, y) =>
    if(x._1 == "america") x else y
}

上面为每个动物值提供了一个不同的值。选择哪个值是不确定的。我们所能说的是,如果存在一个带有"america"的动物值,其中一个将被选中。

关于你的评论:

val df: DataFrame = ???
val animalCol:String = ???
val countryCol: String = ???
val otherCols = df.columns.filter(c => c != animalCol && c!= countryCol)
val rdd: RDD[(String, String, Row)] = 
    df.select(animalCol, countryCol, otherCols:_ *).rdd.map(r => (r.getString(0), r.getString(1), r)

select对列重新排序,以便getString方法提取期望的值。老实说,看看Window aggreations。我不太了解他们,因为我不使用Dataframes或Spark超过1.3。

相关内容

  • 没有找到相关文章

最新更新