从 RDD 条目创建 RDD 在 foreach 循环中



我有一些自定义逻辑来查看RDD中的元素,并希望使用foreach通过UNION方法有条件地写入TempView,如下所示:

rddX.foreach{ x => {      
// Do something, some custom logic
...
val y = create new RDD from this RDD element x  
...
or something else   
// UNION to TempView
...
}}

一些我没有得到的非常基本的东西:

如何将RDD
  1. 的第n个条目(x)转换为长度为1的RDD本身?
  2. 或者,将第 n 个条目 (x) 直接转换为 DF?

我得到了所有基于集合的情况,但为了简单起见,我想在满足条件时立即附加。 即在RDD中的项目条目级别。

现在,在将 -1 作为 SO 41356419之前,我只是建议这样做,因为我有一个特定的用例并在 SPARK SQL 中改变 TempView,我确实需要这种方法 - 至少这是我的想法。不是一个典型的 SPARK 用例,但这就是我们/我所面临的。

提前致谢

首先,你不能在另一个RDD或DF/DS函数的foreach()中创建RDD或DF。但是你可以从RDD获取第n个元素,并用这个元素创建新的RDD。

编辑: 然而,解决方案要简单得多:

import org.apache.spark.{SparkConf, SparkContext}
object Main {
val conf = new SparkConf().setAppName("myapp").setMaster("local[*]")
val sc = new SparkContext(conf)
def main(args: Array[String]): Unit = {
val n = 534 // This is input value (index of the element we'ŗe interested in)
sc.setLogLevel("ERROR")
// Creating dummy rdd
val rdd = sc.parallelize(0 to 999).cache()
val singletonRdd = rdd.zipWithIndex().filter(pair => pair._1 == n)
}
}

希望对您有所帮助!

最新更新