在databricks上将高度分区的数据帧加速到s3



我在Databricks上运行一个笔记本,它创建分区的PySpark数据帧并将其上传到s3。有问题的表有大约5000个文件,总大小大约为5GB(需要以这种方式进行分区才能被Athena有效地查询(。我的问题是,将文件写入s3似乎是顺序的,而不是并行的,并且可能需要长达一个小时的时间。例如:

df.repartition("customer_id")
.write.partitionBy("customer_id")
.mode("overwrite")
.format("parquet")
.save("s3a://mybucket/path-to-table/")

我已经在AWS上启动了我的集群(i3.xlarge(,配置如下:

spark.hadoop.orc.overwrite.output.file true
spark.databricks.io.directoryCommit.enableLogicalDelete true
spark.sql.sources.commitProtocolClass org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
parquet.enable.summary-metadata false
spark.hadoop.fs.s3.maxRetries 20
spark.databricks.hive.metastore.glueCatalog.enabled true
spark.hadoop.validateOutputSpecs false
mapreduce.fileoutputcommitter.marksuccessfuljobs false
spark.sql.legacy.parquet.datetimeRebaseModeInRead CORRECTED
spark.hadoop.fs.s3.consistent.retryPeriodSeconds 10
spark.speculation true
spark.hadoop.fs.s3.consistent true
spark.hadoop.fs.s3.consistent.retryCount 5

在这种情况下,如果我有很多小文件需要快速写入s3,建议使用什么方法?

我看到了你写得慢并且可以加快的几个原因:

  1. 您可能有5000多名客户?因此,使用分区by,您可能有超过5000个分区。由于元存储的开销,Parquet(非三角洲湖表(的速度可能非常慢。我认为你不想要这么多分区
  2. 对于5GB的5000个文件,每个文件的大小约为1MB。这个很小。对于这个问题,您写出的文件大小应该接近100MB
  3. 默认的集群选项设计得很好,我很少需要更改它们,当我更改时,我会启用新功能。您应该尝试解决上述项目,同时删除设置上的所有这些覆盖
  4. Repartition("customer_id"(和partitionBy("customer_id"(是多余的

推荐:

  1. 如果您的上一个阶段创建了>50个分区
  2. 按customer_id去掉分区,也许您可能认为这是有充分理由的,但小文件和大量分区正在扼杀您的性能
  3. 尝试开放的DeltaLake格式(例如CREATE TABLE ... USING DELTA LOCATION ...(。这将加快客户的选择性查询,如果您也使用OPTIMIZE ... ZORDER BY customer_id,则customer_id上的联接将加快,并且可以自动优化文件的大小

最终结果看起来更干净:

df.coalesce(50)
.write
.mode("overwrite")
.format("delta")
.save("s3a://mybucket/path-to-table/")

请参阅有关自动优化选项以自动调整文件大小的信息:https://docs.databricks.com/delta/optimizations/auto-optimize.html#usage

三角洲湖表可与Athena一起使用https://docs.databricks.com/delta/presto-integration.html#presto-和athena到delta湖一体化

在s3 bucket上,是否设置了fs.s3a.fast.upload = true?我在这个链接上看到一张类似的票

最新更新