我正在尝试将parquet文件写入HDFS,然后将其复制到s3。
我用齐柏林写的代码,它运行得很好。没有任何问题,它将文件添加到s3文件路径。
var outputFolder = "buckent_name/path"
println("n ---- TASK 1 ----- n writing with path " + outputFolder)
wholeParquetFile
.withColumn("date_col", to_date(col("timestamp"), "YYYYMMdd"))
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.drop("date_col")
.repartition(1)
.write.mode(SaveMode.Overwrite)
.partitionBy("year", "month", "day")
.parquet(outputFolder)
val sc = spark.sparkContext
val fs = FileSystem.get(sc.hadoopConfiguration)
val allTheFilesThatBennCreated: Array[FileStatus] = fs.globStatus(new Path(outputFolder + "/year=*/month=*/day=*/*"))
println("------- allTheFilesThatBennCreated -------" + allTheFilesThatBennCreated.mkString("Array(", ", ", ")"))
// right now the file path will be outputFile + "/year=2021/month=5/day=17/part-....c000.snappy.parquet
// converting it to outputFile + "/2021/5/17/part-....c000.snappy.parquet"
allTheFilesThatBennCreated.foreach(path => {
val newPathString = generateOutputFilePathString(path.getPath.toString)
val outputFilePath = new Path(newPathString)
val destinationFileSystem = FileSystem.get(outputFilePath.toUri, sc.hadoopConfiguration)
val sourceFileSystem = FileSystem.get(path.getPath.toUri, sc.hadoopConfiguration)
println("-------- source filesystem ------------------" + sourceFileSystem)
println("-------- path.getPath --------------" + path.getPath)
println("-------- destinationFileSystem ------------- " + destinationFileSystem)
println("-------- S3 path for Output File ------------" + outputFilePath)
// uploading to s3 from hdfs
FileUtil.copy(sourceFileSystem, path.getPath, destinationFileSystem, outputFilePath,true, sc.hadoopConfiguration)
})
但是当我尝试在spark-shell中运行相同的代码或通过spark- jar文件提交时,会出现这个错误。
22/05/17 09:57:28 WARN LocalDirAllocator$AllocatorPerContext: /mnt/var/lib/hadoop/tmp/s3a is not writable
org.apache.hadoop.util.DiskChecker$DiskErrorException: Directory is not writable: /mnt/var/lib/hadoop/tmp/s3a
at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:167)
at org.apache.hadoop.util.DiskChecker.checkDirInternal(DiskChecker.java:100)
at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:77)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:315)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:86)
at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:70)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain$1(ApplicationMain.scala:70)
at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:938)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" org.apache.hadoop.util.DiskChecker$DiskErrorException: No space available in any of the local directories.
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:400)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461)
at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:501)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:690)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1075)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1056)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:945)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:393)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:86)
at com.propellyr.driver.ApplicationMain$$anonfun$2.apply(ApplicationMain.scala:70)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.propellyr.driver.ApplicationMain$.delayedEndpoint$com$propellyr$driver$ApplicationMain$1(ApplicationMain.scala:70)
at com.propellyr.driver.ApplicationMain$delayedInit$body.apply(ApplicationMain.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.propellyr.driver.ApplicationMain$.main(ApplicationMain.scala:7)
at com.propellyr.driver.ApplicationMain.main(ApplicationMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:938)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
有人知道怎么解决这个问题吗?
你没有说任何关于Zeplin和CLI的环境是提交到同一个集群还是你的CLI使用本地模式
然而线索在你的堆栈跟踪
本地目录没有可用空间。
进一步观察,错误是在FileUtil.copy()
,它试图将临时输出写入由属性mapred.local.dir
配置的路径,您可以检查