如果-else,则无法从内部将记录添加到RDD



我有一个RDD,我想将这个RDD与另一个RDD(具有相同类型的内容("连接"起来,union是一个合适的方法。但是,在连接rdd之前,我想确保我的集合满足一些要求(因此if语句(,然后合并rdds。不幸的是,下面代码中描述的联合不会在循环之外保留。有没有办法做到这一点?从内部增量向原始RDDrdd添加更多条目 if-else ?如果没有if_else工会就可以了。

var rdd = sc.parallelize(Seq[String]())
val (!collection.isEmpty) {
val value = collection.map(_._2)
rdd.union(value)
}

根据我从你的问题中了解到的,你有两个rdds

val rdd = sc.parallelize(Seq("a", "b", "c"))
val collection = sc.parallelize(Seq((1, "d"), (2, "e")))

并且您希望在检查一些条件后union它们,这些条件可以通过定义函数来完成

def unionRdd(originalRdd: RDD[String], testCollection: RDD[(Int, String)]): RDD[String] ={
if(!collection.isEmpty()){
val value = collection.map(_._2)
originalRdd.union(value)
}
else{
originalRdd
}
}

您可以将该函数调用为

val unionedRDD = unionRdd(rdd, collection)

在检查函数中定义的某些条件后,应该会给你unionRdd级联RDD

我希望答案对您有所帮助

最新更新