Spark S3 CSV读取返回org.apache.hadoop.mapred.InvalidInputExcepti



我在这里和谷歌搜索org.apache.hadop.mapred.InvalidInputException时看到了几篇文章但大多数处理HDFS文件或捕获错误。我的问题是,虽然我可以从sparkshell中读取CSV文件,但从编译的JAR中运行它会不断返回org.apache.hadop.mapred.InvalidInputException错误。

罐子的大致过程:
1。从S3中的JSON文档中读取(这很有效)
2。从S3中的镶木地板文件读取(这也成功了)
3。将针对#1和#2的查询结果写入S3中的镶木地板文件(同样成功)
4。从写入#3的同一存储桶中读取配置csv文件。(此操作失败)

以下是我在代码中尝试过的各种方法:

1. val osRDD = spark.read.option("header","true").csv("s3://bucket/path/")
2. val osRDD = spark.read.format("com.databricks.spark.csv").option("header", "true").load("s3://bucket/path/")

上面两个具有s3、s3a和s3n前缀的变体在REPL中都可以正常工作,但在JAR中,它们返回以下内容:org.apache.hadoop.mapred.InvalidInputException:输入路径不存在:s3://bucket/path/eventsByOS.csv因此,它找到了文件,但无法读取。

认为这是一个权限问题,我尝试过:

a. export AWS_ACCESS_KEY_ID=<access key> and export AWS_SECRET_ACCESS_KEY=<secret> from the Linux prompt.  With Spark 2 this has been sufficient to provide us access to the S3 folders up until now.
b. .config("fs.s3.access.key", <access>)
.config("fs.s3.secret.key", <secret>)
.config("fs.s3n.access.key", <access>)
.config("fs.s3n.secret.key", <secret>)
.config("fs.s3a.access.key", <access>)
.config("fs.s3a.secret.key", <secret>)

在此失败之前,代码从位于同一个bucket中的镶木地板文件中读取,并将镶木地板写入同一bucket。CSV文件的大小只有4.8 KB。

有什么想法为什么会失败吗?

谢谢!

添加堆栈跟踪:

org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:253)
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:201)
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:281)
org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1332)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
org.apache.spark.rdd.RDD.take(RDD.scala:1326)
org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1367)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
org.apache.spark.rdd.RDD.first(RDD.scala:1366)
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.findFirstLine(CSVFileFormat.scala:206)
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:60)
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:184)
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:184)
scala.Option.orElse(Option.scala:289)
org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:183)
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:415)
org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:352)

当我将堆栈粘贴到IDE中时,没有任何结果,但我正在查看Hadoop的更新版本,目前无法切换到旧版本。

  1. 看看这些说明
  2. 这个landsat-gz文件实际上是一个CSV文件,您可以尝试在其中读取;它是我们通常用来测试的,因为它是免费的。首先看看你是否可以使用它
  3. 如果使用spark 2.0,请使用spark自己的CSV包
  4. 请使用S3a,不要使用其他

我通过为适当的方法添加特定的Hadoop配置来解决这个问题(这里的示例中为s3)。奇怪的是,除了读取CSV之外,上述安全性适用于Spark 2.0中的所有内容。

这段代码使用S3解决了我的问题。

spark.sparkContext.hadoopConfiguration.set("fs.s3.awsAccessKeyId", p.aws_accessKey)
spark.sparkContext.hadoopConfiguration.set("fs.s3.awsSecretAccessKey",p.aws_secretKey)

相关内容

最新更新