Spark RDD 相当于 Scala 集合分区



这是我的一个火花工作的小问题,似乎没有引起任何问题 - 但每次看到它时都会让我烦恼,并且未能提出更好的解决方案。

假设我有一个这样的 Scala 集合:

val myStuff = List(Try(2/2), Try(2/0))

我可以使用分区将此列表划分为成功和失败:

val (successes, failures) =  myStuff.partition(_.isSuccess)

这很好。分区的实现仅遍历源集合一次以生成两个新集合。但是,使用Spark,我能够设计的最好的等价物是这样的:

val myStuff: RDD[Try[???]] = sourceRDD.map(someOperationThatMayFail)
val successes: RDD[???] = myStuff.collect { case Success(v) => v }
val failures: RDD[Throwable] = myStuff.collect { case Failure(ex) => ex }

除了解压缩 Try 的差异(这很好)之外,还需要遍历数据两次。这很烦人。

有没有更好的Spark替代方案可以在没有多次遍历的情况下拆分RDD?即有一个这样的签名,其中分区具有Scala集合分区而不是RDD分区的行为:

val (successes: RDD[Try[???]], failures: RDD[Try[???]]) = myStuff.partition(_.isSuccess)

作为参考,我之前使用过类似以下内容来解决此问题。潜在的失败操作是从二进制格式反序列化某些数据,并且失败已经变得足够有趣,需要将它们处理并保存为 RDD 而不是记录的内容。

def someOperationThatMayFail(data: Array[Byte]): Option[MyDataType] = {
   try {
      Some(deserialize(data))
   } catch {
      case e: MyDesrializationError => {
         logger.error(e)
         None
      }
   }
}

可能还有其他解决方案,但在这里你可以:

设置:

import scala.util._
val myStuff = List(Try(2/2), Try(2/0))
val myStuffInSpark = sc.parallelize(myStuff)

执行:

val myStuffInSparkPartitioned = myStuffInSpark.aggregate((List[Try[Int]](),List[Try[Int]]()))(
  (accum, curr)=>if(curr.isSuccess) (curr :: accum._1,accum._2) else (accum._1, curr :: accum._2), 
  (first, second)=> (first._1 ++ second._1,first._2 ++ second._2))

如果您需要解释,请告诉我

相关内容

  • 没有找到相关文章