如何重命名运行在Dataproc Serverless上的Spark中的GCS文件?



将spark数据帧写入文件后,我试图使用下面的代码重命名文件:

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val file = fs.globStatus(new Path(path + "/part*"))(0).getPath().getName()
fs.rename(new Path(path + "/" + file), new Path(path + "/" + fileName))

在本地运行Spark效果很好…但是,当我在Dataproc上运行jar时,我得到如下错误:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-***, in path: gs://prj-*****/part*, expected bucket: dataproc-temp-***

似乎文件可能直到作业结束才保存到目标桶中,因此很难重命名它们。我已经尝试过使用.option("mapreduce.fileoutputcommitter.algorithm.version", "2"),因为我读到了一些看起来很有前途的东西。

更新:还是没有运气。看起来spark.sparkContext.hadoopConfiguration期望基础桶是dataproc-temp-*桶。下面的完整堆栈跟踪:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-**, in path: gs://p**, expected bucket: dataproc-temp-u***
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.checkPath(GoogleHadoopFileSystem.java:95)
at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:667)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.makeQualified(GoogleHadoopFileSystemBase.java:394)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:149)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1085)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1059)

FileSystem.get(...)调用返回的HCFS实例绑定到特定的FS(在本例中为GCS桶)。默认情况下,Dataproc Serverless Spark通过spark.hadoop.fs.defaultFSSpark属性配置为使用gs://daptaproc-temp-*/bucket作为默认HCFS。

要解决这个问题,您需要使用FileSystem#get(URI uri, Configuration conf)调用创建HCFS实例:

val fs = FileSystem.get(path.toUri, spark.sparkContext.hadoopConfiguration)

最新更新