试图从hdfs加载avro。我有大约 1000 个部分 avro 文件在目录中。我正在使用这个来阅读它们 -
val df = sqlContext.read.format("com.databricks.spark.avro").load("path/to/avro/dir") df.select("QUERY").take(50).foreach(println)
如果我在路径中只传递了 1 或 2 个 avro 文件,它就可以工作。但是,如果我传递包含 400+ 文件的目录,则会出现此错误。每个 avro 大约是 300mb。
org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem closed
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:275)
at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:248)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:216)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4$$anon$1.advanceNextRecord(AvroRelation.scala:157)
at com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4$$anon$1.hasNext(AvroRelation.scala:166)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
at scala.collection.AbstractIterator.to(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:905)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:905)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:776)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.avro.mapred.FsInput.read(FsInput.java:46)
at org.apache.avro.file.DataFileReader$SeekableInputStream.read(DataFileReader.java:210)
at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.tryReadRaw(BinaryDecoder.java:839)
at org.apache.avro.io.BinaryDecoder.isEnd(BinaryDecoder.java:444)
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:261)
... 36 more
这可能是因为目录中的某个 avro 文件未正确写入。可能是写入文件的过程突然停止。删除未正确写入的文件应该可以解决问题。