平面地图是否比筛选器+地图提供更好的性能?



我有一个相当大的数据集(1亿+条记录和100列),我正在使用Spark进行处理。我正在将数据读入 Spark 数据集,我想过滤此数据集并将其字段的子集映射到案例类。

代码看起来有些相似,

case class Subset(name:String,age:Int)
case class Complete(name:String,field1:String,field2....,age:Int)
val ds = spark.read.format("csv").load("data.csv").as[Complete]
#approach 1
ds.filter(x=>x.age>25).map(x=> Subset(x.name,x.age))
#approach 2
ds.flatMap(x=>if(x.age>25) Seq(Subset(x.name,x.age)) else Seq.empty)

哪种方法更好?关于如何使此代码性能更高的任何其他提示?

谢谢!

编辑

我运行了一些测试来比较运行时,看起来方法 2 相当快,我用于获取运行时的代码如下,

val subset = spark.time {
ds.filter(x=>x.age>25).map(x=> Subset(x.name,x.age))
}
spark.time {
subset.count()
}
and 
val subset2 = spark.time {
ds.flatMap(x=>if(x.age>25) Seq(Subset(x.name,x.age)) else Seq.empty)
}
spark.time {
subset2.count()
}

更新:我最初的答案包含一个错误:Spark确实支持Seq作为flatMap的结果(并将结果转换回Dataset)。对混乱表示歉意。我还添加了有关提高分析性能的更多信息。

更新2:我错过了您使用的是Dataset而不是RDD(doh!这不会显着影响答案。

Spark是一个分布式系统,它跨多个节点对数据进行分区并并行处理数据。就效率而言,导致重新分区(需要在节点之间传输数据)的操作在运行时方面比就地修改要昂贵得多。另外,您应该注意,仅转换数据的操作,例如filtermapflatMap等,仅存储,并且在执行操作操作(例如reducefoldaggregate等)之前不会执行。因此,就目前的情况而言,这两种选择实际上都没有起到任何作用。

当对这些转换的结果执行操作时,我希望filter操作更有效:它只处理传递谓词x=>x.age>25的数据(使用后续map操作)(通常写为_.age > 25)。虽然看起来filter会创建一个中间集合,但它会延迟执行。因此,Spark似乎将filtermap操作融合在一起。

坦率地说,你的flatMap行动是可怕的。它强制处理、序列创建和随后对每个数据项进行扁平化,这肯定会增加整体处理。

也就是说,提高分析性能的最佳方法是控制分区,以便将数据大致平均地分布在尽可能多的节点上。请参阅本指南作为良好的起点。

从语法的逻辑来看,第一种方法应该使用更少的空间,因为flatMap扩展到.map().flatten,两者都在大小相等的参数上。它在 Scala REPL 中编译为相同的 Java 字节码(编辑:当使用宠物示例时,这显然不能补偿实际使用相对较大的数据对其进行测试)。

最新更新