我是Spark/Scala的新手。我最初的RDD是类型记录,记录的布局是:
a_key, b_key,c_key,f_name,l_name,address
现在我必须:
- 删除具有A_KEY或B_KEY或C_KEY的记录为null/empty
- 我必须同时更新无效记录的计数器。
我已经尝试过:
sc.register( recordStatsAccumulator, "Stat accumulator for " + filename )
val nullFilteredRecords = records.map{ record =>
if( record.A_KEY.isEmpty ||
record.B_KEY.isEmpty ||
record.C_KEY.isEmpty )
{
recordStatsAccumulator.add( ValidationLoggingUtil.INVALID )
}
record
}
.filter( record =>
!record.A_KEY.isEmpty &&
!record.B_KEY.isEmpty &&
!record.C_KEY.isEmpty
)
但是,此代码并不有效,因为它将整个RDD thorugh两次。首先,要更新无效的记录计数器,然后再次删除无效的记录。
有更好/有效的方法吗?
我认为您可以一步将两个操作结合。这样:
val nullFilteredRecords = records.filter { record =>
if( record.A_KEY.isEmpty ||
record.B_KEY.isEmpty ||
record.C_KEY.isEmpty ) {
recordStatsAccumulator.add( ValidationLoggingUtil.INVALID )
}
!record.A_KEY.isEmpty && !record.B_KEY.isEmpty && !record.C_KEY.isEmpty
}