Spark:加载时分组



通常我加载csv文件,然后用Spark运行不同类型的聚合,例如"group by"。我想知道是否有可能在文件加载期间(通常是数百万行)开始这种操作,而不是将它们顺序化,以及它是否值得(节省时间)。

的例子:

val csv = sc.textFile("file.csv")
val data = csv.map(line => line.split(",").map(elem => elem.trim))
val header = data.take(1)
val rows = data.filter(line => header(0) != "id")
val trows = rows.map(row => (row(0), row))
trows.groupBy(//row(0) etc.)

根据我对Spark工作原理的理解,groupBy(或aggregate)将被"推迟"到内存中加载整个文件csv。如果这是正确的,加载和分组是否可以在"同一"时间运行,而不是对两个步骤进行排序?

groupBy(或aggregate)将被"推迟"到内存中加载整个csv文件。

事实并非如此。在本地(单个分区)级别,Spark对惰性序列进行操作,因此属于单个任务的操作(包括映射侧聚合)可以压缩在一起。

换句话说,当您拥有方法链时,操作是逐行执行的,而不是逐个转换。换句话说,在访问下一行之前,第一行将被映射、过滤、再次映射并传递给聚合器。

通过on load操作启动一个组,您可以使用两个选项:

    写你自己的加载器,让成为你自己的组,在里面+ aggregationByKey。这样做的缺点是需要编写更多的代码。更多有关维修。
  1. 使用Parquet格式文件作为输入+ DataFrames,由于它是列式的,它将只读取groupBy中使用的所需列。所以它应该更快。——DataFrameReader

    df = spark.read.parquet('file_path')
    df = df.groupBy('column_a', 'column_b', '...').count()
    df.show()
    

由于Spark是懒惰的,它不会加载你的文件,直到你调用动作方法,如show/collect/write。因此,Spark将知道加载过程中哪些列被读取,哪些列被忽略。

最新更新