Apache Spark RDD过滤为两个RDD



我需要将RDD分成两部分:

1个满足条件的部分;另一部分没有。我可以在原来的RDD上做两次filter,但它似乎效率低下。有没有一种方法可以做到我想要的?我在API或文献中找不到任何东西。

Spark默认不支持此功能。如果事先缓存相同的数据,对其进行两次过滤并没有那么糟糕,而且过滤本身也很快。

如果它真的只是两种不同的类型,你可以使用一种辅助方法:

implicit class RDDOps[T](rdd: RDD[T]) {
  def partitionBy(f: T => Boolean): (RDD[T], RDD[T]) = {
    val passes = rdd.filter(f)
    val fails = rdd.filter(e => !f(e)) // Spark doesn't have filterNot
    (passes, fails)
  }
}
val (matches, matchesNot) = sc.parallelize(1 to 100).cache().partitionBy(_ % 2 == 0)

但是,一旦有了多种类型的数据,只需将过滤后的数据分配给一个新的值。

Spark RDD没有这样的api。

这是一个基于rdd.span拉取请求的版本,应该可以工作:

import scala.reflect.ClassTag
import org.apache.spark.rdd._
def split[T:ClassTag](rdd: RDD[T], p: T => Boolean): (RDD[T], RDD[T]) = {
    val splits = rdd.mapPartitions { iter =>
        val (left, right) = iter.partition(p)
        val iterSeq = Seq(left, right)
        iterSeq.iterator
    }
    val left = splits.mapPartitions { iter => iter.next().toIterator}
    val right = splits.mapPartitions { iter => 
        iter.next()
        iter.next().toIterator
    }
    (left, right)
}
val rdd = sc.parallelize(0 to 10, 2)
val (first, second) = split[Int](rdd, _ % 2 == 0 )
first.collect
// Array[Int] = Array(0, 2, 4, 6, 8, 10)

重点是,您不想做过滤器,而是想做映射。

(T) -> (Boolean, T)

对不起,我在Scala语法方面效率不高。但这个想法是通过将答案集映射到Key/Value对来分割答案集。Key可以是一个布尔值,指示它是否正在传递"Filter"谓词。

您可以通过分区处理来控制对不同目标的输出。只需确保您没有将并行处理限制为下游的两个分区。

另请参阅如何将RDD拆分为两个或多个RDD?

如果您可以使用T而不是RDD[T],那么您可以执行此操作。否则,你可能会做这样的事情:

val data = sc.parallelize(1 to 100)
val splitData = data.mapPartitions{iter => {
    val splitList = (iter.toList).partition(_%2 == 0)
    Tuple1(splitList).productIterator
  }
}.map(_.asInstanceOf[Tuple2[List[Int],List[Int]]])

然后,当您执行操作

时,您可能需要将其减少以合并列表

您可以使用subtract function(如果过滤操作过于昂贵)。

PySpark代码:

rdd1 = data.filter(filterFunction)
rdd2 = data.subtract(rdd1)

最新更新