在出现一定数量的错误后停止处理 Apache Spark 中的大型文本文件



我对Spark很陌生。我在 1.6.1 中工作。假设我有一个大文件,我正在通过文本文件将其读取到 RDD[字符串] 中。然后我想验证某个函数中的每一行。因为文件很大,所以我想在达到一定数量的错误时停止处理,比如说 1000 行。类似的东西

val rdd = sparkContext.textFile(fileName) rdd.map(line => myValidator.validate(line))

这是验证函数:

def validate(line:String) : (String, String) = { // 1st in Tuple for resulted line, 2nd ,say, for validation error. }

如何计算"验证"中的错误?它实际上是在多个节点上并行执行的?广播?蓄电池?

您可以使用

Spark 的懒惰来实现此行为,方法是将分析结果"拆分"为成功和失败,对失败调用take(n),并且仅在失败次数少于 n 次时才使用成功数据。

为了更方便地实现这一点,我建议更改 validate 的签名以返回一些可以轻松区分成功和失败的类型,例如 scala.util.Try

def validate(line:String) : Try[String] = {
    // returns Success[String] on success, 
    // Failure (with details in the exception object) otherwise 
}

然后,像这样:

val maxFailures = 1000
val rdd = sparkContext.textFile(fileName)
val parsed: RDD[Try[String]] = rdd.map(line => myValidator.validate(line)).cache()
val failures: Array[Throwable] = parsed.collect { case Failure(e) => e }.take(maxFailures)
if (failures.size == maxFailures) { 
  // report failures... 
} else {
  val success: RDD[String] = parsed.collect { case Success(s) => s }
  // continue here...
}

为什么会这样?

  • 如果失败次数少于 1000 次,则调用 take(maxFailures) 时将解析整个数据集,成功数据将被缓存并可供使用
  • 如果有 1000 次或更多失败,解析将就此停止,因为take操作不再需要读取

最新更新