我对Spark很陌生。我在 1.6.1 中工作。假设我有一个大文件,我正在通过文本文件将其读取到 RDD[字符串] 中。然后我想验证某个函数中的每一行。因为文件很大,所以我想在达到一定数量的错误时停止处理,比如说 1000 行。类似的东西
val rdd = sparkContext.textFile(fileName)
rdd.map(line => myValidator.validate(line))
这是验证函数:
def validate(line:String) : (String, String) = {
// 1st in Tuple for resulted line, 2nd ,say, for validation error.
}
如何计算"验证"中的错误?它实际上是在多个节点上并行执行的?广播?蓄电池?
Spark 的懒惰来实现此行为,方法是将分析结果"拆分"为成功和失败,对失败调用take(n)
,并且仅在失败次数少于 n
次时才使用成功数据。
为了更方便地实现这一点,我建议更改 validate
的签名以返回一些可以轻松区分成功和失败的类型,例如 scala.util.Try
:
def validate(line:String) : Try[String] = {
// returns Success[String] on success,
// Failure (with details in the exception object) otherwise
}
然后,像这样:
val maxFailures = 1000
val rdd = sparkContext.textFile(fileName)
val parsed: RDD[Try[String]] = rdd.map(line => myValidator.validate(line)).cache()
val failures: Array[Throwable] = parsed.collect { case Failure(e) => e }.take(maxFailures)
if (failures.size == maxFailures) {
// report failures...
} else {
val success: RDD[String] = parsed.collect { case Success(s) => s }
// continue here...
}
为什么会这样?
- 如果失败次数少于 1000 次,则调用
take(maxFailures)
时将解析整个数据集,成功数据将被缓存并可供使用 - 如果有 1000 次或更多失败,解析将就此停止,因为
take
操作不再需要读取