Spark Scala:如何同时过滤RDD和更新计数器



我是Spark/Scala的新手。我最初的RDD是类型记录,记录的布局是:

a_key, b_key,c_key,f_name,l_name,address

现在我必须:

  • 删除具有A_KEY或B_KEY或C_KEY的记录为null/empty
  • 我必须同时更新无效记录的计数器。

我已经尝试过:

sc.register( recordStatsAccumulator, "Stat accumulator for " + filename )
val nullFilteredRecords = records.map{ record =>
  if( record.A_KEY.isEmpty ||
    record.B_KEY.isEmpty ||
    record.C_KEY.isEmpty )
  {
    recordStatsAccumulator.add( ValidationLoggingUtil.INVALID )
  }
  record
 }
 .filter( record =>
    !record.A_KEY.isEmpty &&
      !record.B_KEY.isEmpty &&
      !record.C_KEY.isEmpty
  )

但是,此代码并不有效,因为它将整个RDD thorugh两次。首先,要更新无效的记录计数器,然后再次删除无效的记录。

有更好/有效的方法吗?

我认为您可以一步将两个操作结合。这样:

val nullFilteredRecords = records.filter { record =>
  if( record.A_KEY.isEmpty ||
    record.B_KEY.isEmpty ||
    record.C_KEY.isEmpty ) {
    recordStatsAccumulator.add( ValidationLoggingUtil.INVALID )
  }
  !record.A_KEY.isEmpty && !record.B_KEY.isEmpty && !record.C_KEY.isEmpty
}

相关内容

  • 没有找到相关文章

最新更新