在 Azure Blob 存储中编写 Parquet:"One of the request inputs is not valid"



我想写一个简单的DataFrame在parquet格式Azure Blob存储.注意,下面的代码片段在本地工作,所以我猜测它一定是与Azure库相关的东西。我也尝试过delta格式,它可以工作(即使它在引擎盖下使用parquet)。

使用Spark 3.1.1, Scala 2.12.10, OpenJDK 1.8.0_292.

我像往常一样设置我的Spark会话,类似于:

$SPARK_HOME/bin/spark-shell 
(...cluster settings...) 
--conf spark.hadoop.fs.azure.account.key.<account>.blob.core.windows.net="${AZURE_BLOB_STORAGE_KEY}" 
--conf spark.hadoop.fs.AbstractFileSystem.wasb.impl=org.apache.hadoop.fs.azure.Wasb 
--conf spark.hadoop.fs.wasb.impl=org.apache.hadoop.fs.azure.NativeAzureFileSystem 
--conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.AzureLogStore 
--packages org.apache.hadoop:hadoop-azure:2.7.0,com.azure:azure-storage-blob:12.8.0,com.azure:azure-storage-common:12.8.0,com.microsoft.azure:azure-storage:2.0.0,io.delta:delta-core_2.12:0.8.0
(...other irrelevant settings...)

我尝试了azure-storage-blob,azure-storage-commonazure-storage包的其他版本,都导致同样的问题。

为了重现这个问题,我创建了一个简单的数据框架并将其写入存储器:

val columns = Seq("language", "users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)
val df = spark.createDataFrame(rdd).toDF(columns: _*)
df.show
// +--------+-----------+                                                                                                                                  
// |language|users_count|
// +--------+-----------+
// |    Java|      20000|
// |  Python|     100000|
// |   Scala|       3000|
// +--------+-----------+
df.write.parquet("wasb://<container>@<account>.blob.core.windows.net/<path>")

当写在拼花格式我得到com.microsoft.azure.storage.StorageException: One of the request inputs is not valid异常:

21/09/21 13:38:14 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 83) (10.244.6.3 executor 6): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2482)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending.execute(NativeAzureFileSystem.java:424)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem.rename(NativeAzureFileSystem.java:1997)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:531)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:502)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:260)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:79)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:280)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
... 9 more
Caused by: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:162)
at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:307)
at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:177)
at com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(CloudBlob.java:764)
at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
... 20 more

有什么提示或想法是什么导致它或做什么使它工作?谢谢你!

使用WASB协议写Azure存储有两件事帮助我:

  1. 存储容器应该是数据湖Gen1(尝试Gen2失败)

  2. 你需要添加以下依赖项/jar:org.codehaus.jacksonjackson-mapper-lgpl1.9.13

我不得不使用WASB(由于我的hadoop版本(2.9.2)不支持ABFS),但如果你有hadoop-2.10.1+,请使用ABFS

只需更改洗涤abfssblobdfs哪里用过。示例代码:wasb://@。blob.core.windows.net/来abfss://@。dfs.core.windows.net

这对我很有用。

相关内容

最新更新