无法使用 PySpark 写入 S3 - 状态代码为 400 (InvalidPart) 的 AmazonS3Exception



我有一个简单的火花作业,它做3件事:

  1. 从AWS S3逐月读取json数据(数据按日期分区(
  2. 对数据进行一些最小限度的处理
  3. 将已处理的月度数据覆盖到同一来源

作业成功处理并覆盖了几个月,但在覆盖到S3:时随机引发了此异常

由以下原因引起:com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3异常:找不到一个或多个指定的部件。零件可能尚未上载,或者指定的实体标记可能与零件的实体标记不匹配。(服务:Amazon S3;状态代码:400;错误代码:InvalidPart;请求ID:123;S3扩展请求ID:xyz/ck+foo/bar=(

以下是PySpark作业的代码片段:

spark = SparkSession.builder.appName('simple_app').getOrCreate()
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
source_data_lake_path = "s3://my-data-lake/data"
months_not_found, months_cleaned = [], []
for month in ['2020-01-*', '2020-02-*', '2020-03-*', ....]:
try:
data = spark.read.json(f"{source_data_lake_path}/persist_date={month}")
except AnalysisException:
months_not_found.append(month)
continue
cleaned_data, cleaned = clean_data(data)
if cleaned:
cleaned_data = cleaned_data.withColumn("persist_date", F.to_date(F.col("persist_timestamp")))
cleaned_data.repartition("persist_date").write.partitionBy("persist_date").mode("overwrite").json(
source_data_lake_path
)
months_cleaned.append(month)

我的发现:

  1. 使用s3://,因为s3://和s3n://在AWS EMR的上下文中在功能上是可互换的,而s3a://与EMR不兼容
  2. 我认为这是因为多个并发写入并减少了节点数量。尽管如此,由于此异常,作业有时会随机失败

我自己从未见过,但猜测:

当您的客户端正在上传数据时,另一个进程正在bucket上列出并中止未完成的上传

这可能是因为

  • 它是从cli或其他应用程序显式完成的
  • bucket设置为在一段时间后(例如24小时(自动删除未完成的上传
  • 其他一些作业正在向同一路径写入,其提交器正在列出并中止上载

要查看这是否是原因,请启用bucket的日志记录,然后查看事件发生时的日志,并在日志中查找"REST.DELETE.UPLOAD"条目。找出谁在做这件事,并告诉他们停止它。

最新更新