在使用SaveAsTextFile时,为什么在Google DataProc中运行的Spark会在外部存储(GCS)上存



我已经运行以下pyspark代码:

from pyspark import SparkContext
sc = SparkContext()
data = sc.textFile('gs://bucket-name/input_blob_path')
sorted_data = data.sortBy(lambda x: sort_criteria(x))
sorted_data.saveAsTextFile(
    'gs://bucket-name/output_blob_path',
    compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec"
)

工作成功完成。但是,在作业执行期间,火花在以下路径gs://bucket-name/output_blob_path/_temporary/0/中创建了许多临时斑点。我意识到,最后消除所有这些临时斑点花费了一半的工作执行时间,而在此期间,CPU利用率为1%(巨大的资源浪费)。

有没有办法将临时文件存储在本地驱动器(或HDFS)而不是GCP上?我仍然想将最终结果(排序数据集)持续到GCP。

我们使用的是带有10个工作节点的DataProc Spark群集(VM Type 16ce,60gm)。输入数据的音量为10TB。

您看到的 _temporary文件可能是在引擎盖下使用的fileOutputcommitter的工件。重要的是,这些临时斑点不是严格的"临时"数据,而是完成了输出数据,这些数据仅被"重命名"到工作完成时的最终目的地。这些文件通过重命名的"提交"实际上很快,因为源和目的地都在GC上。因此,无法将临时文件放置在HDFS上,然后将其"投入"到GC中,因此无法替换工作流程的那部分,因为这样的提交将需要将整个输出数据集重新置于HDF中,从HDF重新返回到GCS中。具体而言,基础的Hadoop FileOutputFormat类不支持这样的成语。

gcs本身不是一个真正的文件系统,而是"对象存储",而DataProc中的GCS连接器仅模仿HDFS,它的能力最好。结果之一是,删除目录填充的文件实际上需要GC在引擎盖下删除单个对象,而不是真实的文件系统,而只是UNINK inode。

实际上,如果您要击中此功能,则可能意味着您的输出反正分为太多文件,因为一次清理确实发生了〜1000个文件。因此,通常不应明显慢慢来慢。拥有太多文件也将使未来在这些文件上工作较慢。最简单的修复通常只是为了尽可能减少输出文件的数量,例如使用repartition()

from pyspark import SparkContext
sc = SparkContext()
data = sc.textFile('gs://bucket-name/input_blob_path')
sorted_data = data.sortBy(lambda x: sort_criteria(x))
sorted_data.repartition(1000).saveAsTextFile(
    'gs://bucket-name/output_blob_path',
    compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec"
)

我有与您以前相同的问题。我的博客:Spark Speedup将文件写入云存储。然后,我发现这篇文章Spark 2.0.0群集需要更长的时间来附加数据。

如果发现使用SPARK 2.0.0版本的群集需要更长的时间将数据附加到现有数据集,尤其是所有Spark作业都已经完成,但是您的命令尚未完成,这是因为驱动程序节点是将任务的输出文件从作业临时目录移至最终目标,该目的地逐一将云存储速度慢。要解决此问题,请将mapReduce.fileOutputcommitter.algorithm.version设置为2。请注意,此问题不会影响覆盖数据集或将数据写入新位置。

当我们将GC用作TMP存储时,此问题将在云环境中放大。

如何修复它?

您只需添加此参数即可解决此问题,这意味着您在将文件保存到GCS时不会创建TMP文件。

write.option("mapreduce.fileoutputcommitter.algorithm.version", "2")

警告!

由于数据丢失的机会,DirectParquetutputCommitter被从SPARK 2.0中删除。

最新更新