我试图用下面的命令读取位于S3上的Apache Spark (Pyspark)文件,并得到下面的错误。该文件是压缩后的JSON格式,以年/月/日/小时/分钟为分区。我已经成功地从这个商店中提取了几个月的数据,但是由于这个特定的文件,我得到了错误。我在一台连接到AWS EMR的AWS Sagemaker笔记本上运行这个程序。
<<p>命令/strong># both fail
spark.read.json('s3://my_bucket/my_prefix/2021/08/31/08/53/').show()
spark.read.json('s3://my_bucket/my_prefix/2021/08/31/08/53/my_file.gz').show()
误差
An error was encountered:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10.0 (TID 22) (ip-10-251-32-235.eu-west-1.compute.internal executor 19): java.io.FileNotFoundException: No such file or directory 's3://my_bucket/my_prefix/2021/08/31/08/53/my_file.gz'
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:194)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:240)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:159)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:85)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)
我已经尽力了:
- 重启spark集群
- 验证文件;在此错误前10天更新的
- 验证这是一个独特的Spark问题;我可以在 下面使用非spark命令查看该文件
# this command successfully prints out the erroneous file
import boto3
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('my_bucket')
for my_bucket_object in my_bucket.objects.filter(Prefix='my_prefix/2021/08/31/08/53'):
print(my_bucket_object)
- 创建表并刷新
# Tried this
spark.sql("CREATE OR REPLACE TEMPORARY VIEW bad_file USING json OPTIONS" +
" (path 's3://my_bucket/my_prefix/2021/08/31/08/53/')")
spark.sql('REFRESH TABLE bad_file')
# And this
spark.read.json('s3://my_bucket/my_prefix/2021/08/31/08/53/my_file.gz').cache().unpersist()
我看到这是一个常见的问题,但大多数人似乎通过刷新表来解决它。如有任何帮助,不胜感激。
试试这个:打开你的S3 Bucket文件,然后"使用ACL"S3动作列表