我有一个RDD,我想将这个RDD与另一个RDD(具有相同类型的内容("连接"起来,union
是一个合适的方法。但是,在连接rdd之前,我想确保我的集合满足一些要求(因此if语句(,然后合并rdds。不幸的是,下面代码中描述的联合不会在循环之外保留。有没有办法做到这一点?从内部增量向原始RDDrdd
添加更多条目 if-else ?如果没有if_else工会就可以了。
var rdd = sc.parallelize(Seq[String]())
val (!collection.isEmpty) {
val value = collection.map(_._2)
rdd.union(value)
}
根据我从你的问题中了解到的,你有两个rdds
val rdd = sc.parallelize(Seq("a", "b", "c"))
val collection = sc.parallelize(Seq((1, "d"), (2, "e")))
并且您希望在检查一些条件后union
它们,这些条件可以通过定义函数来完成
def unionRdd(originalRdd: RDD[String], testCollection: RDD[(Int, String)]): RDD[String] ={
if(!collection.isEmpty()){
val value = collection.map(_._2)
originalRdd.union(value)
}
else{
originalRdd
}
}
您可以将该函数调用为
val unionedRDD = unionRdd(rdd, collection)
在检查函数中定义的某些条件后,应该会给你unionRdd
级联RDD
我希望答案对您有所帮助