具有gzip格式的大型文本文件的Spark作业



我正在运行一个Spark作业,该作业处理输入文件的时间太长。输入文件为6.8GB的Gzip格式,包含110M行文本。我知道它是Gzip格式的,所以它是不可拆分的,并且只有一个执行器将用于读取该文件。

作为调试过程的一部分,我决定看看将gzip文件转换为镶木地板需要多长时间。我的想法是,一旦我转换为镶木地板文件,然后如果我在该文件上运行我的原始Spark作业,在这种情况下,它将使用多个执行器,并且输入文件将被并行处理。

但即使是这份小工作也比我预期的要花很长时间。这是我的代码:

val input = sqlContext.read.text("input.gz")
input.write.parquet("s3n://temp-output/")

当我在笔记本电脑(16GB RAM)中提取该文件时,不到2分钟。当我在Spark集群上运行它时,我预计它将花费相同甚至更少的时间,因为我使用的执行器内存是58GB。花了大约20分钟。

我在这里错过了什么?如果这听起来很业余,我很抱歉,但我是Spark的新手。

在gzip文件上运行Spark作业的最佳方式是什么?假设我没有以其他文件格式(bzip2、snappy、lzo)创建该文件的选项。

在执行Spark作业的输入流程输出类型时,需要考虑三个独立的问题:

  1. 输入并行度
  2. 处理并行性
  3. 输出并行性

在您的情况下,输入并行度为1,因为在您的问题中,您声称无法更改输入格式或粒度。

你也基本上没有进行任何处理,所以你无法在那里获得任何收益。

但是,您可以控制输出并行性,这将给您带来两个好处:

  • 多个CPU将进行写入,从而减少写入操作的总时间。

  • 您的输出将被拆分为多个文件,以便在以后的处理中利用输入并行性。

要提高并行性,必须增加分区的数量,这可以通过repartition()来实现,例如

val numPartitions = ...
input.repartition(numPartitions).write.parquet("s3n://temp-output/")

在选择分区的最佳数量时,需要考虑许多不同的因素。

  • 数据大小
  • 奇偶校验偏斜
  • 群集RAM大小
  • 群集中的核心数量
  • 您将执行的后续处理类型
  • 用于后续处理的集群(RAM和内核)的大小
  • 您正在写入的系统

在不知道自己的目标和限制的情况下,很难提出可靠的建议,但以下是一些通用指南:

  • 由于您的分区不会偏斜(上面使用的repartition将使用一个哈希分区器来校正偏斜),如果您将分区的数量设置为执行器核心的数量,假设您使用的节点具有足够的I/O,则您将获得最快的吞吐量。

  • 当您处理数据时,您确实希望整个分区能够"适应"分配给单个执行器核心的RAM。"合适"在这里的含义取决于您的处理。如果您正在进行简单的map转换,则数据可能会被流式传输。如果你正在做一些涉及订购的事情,那么RAM需要大幅增长。如果您使用的是Spark 1.6+,您将获得更灵活的内存管理优势。如果您使用的是早期版本,则必须更加小心。当Spark必须开始对磁盘进行"缓冲"时,作业执行就会停止。磁盘上的大小和RAM中的大小可能非常非常不同。后者根据处理数据的方式以及Spark可以从谓词下推中获得的好处而有所不同(Parquet支持这一点)。使用Spark UI可以查看各个作业阶段占用的RAM量。

BTW,除非您的数据具有非常特定的结构,否则不要硬编码分区号,因为这样您的代码将在不同大小的集群上以次优方式运行。相反,使用以下技巧来确定集群中执行器的数量。然后,您可以根据所使用的机器乘以每个执行器的核心数量。

// -1 is for the driver node
val numExecutors = sparkContext.getExecutorStorageStatus.length - 1

作为参考,在我们的团队中,我们使用相当复杂的数据结构,这意味着RAM大小>>磁盘大小,我们的目标是将S3对象保持在50-250Mb范围内,以便在每个执行器核心具有10-20Gb RAM的节点上进行处理。

希望这能有所帮助。

相关内容

  • 没有找到相关文章

最新更新