Spark + S3 + IAM Roles



我正在尝试使用 IAM 角色从 Spark 读取 s3 存储桶中的 csv 文件,但NoClassDefFoundErrorMultiObjectDeleteException

我已经安装了没有hadoop的Spark 2.4.4,并安装了hadoop 3.2.1以及hadoop-aws-3.2.1.jar和aws-java-sdk-1.11.655.jar。 我不得不安装一个没有 hadoop 的 Spark 版本,因为作为 Spark 构建一部分的 hadoop jars 是 2.7.3,它是 2016 年的。

sc.hadoopConfiguration.set("fs.s3a.credentialsType", "AssumeRole")
sc.hadoopConfiguration.set("fs.s3a.assumed.role.arn", "arn:aws:iam::[ROLE]")
val myRDD = sc.textFile("s3a://test_bucket/names.csv")
myRDD.count()

附加到角色的我的 IAM 策略具有以下内容

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"s3:PutAccountPublicAccessBlock",
"s3:GetAccountPublicAccessBlock",
"s3:ListAllMyBuckets",
"s3:ListJobs",
"s3:CreateJob",
"s3:HeadBucket"
],
"Resource": "*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "s3:*",
"Resource": "arn:aws:s3:::test_bucket"
}
]
}

我什至尝试过sc.hadoopConfiguration.set("fs.s3a.multiobjectdelete.enable", "false")但错误如下:

java.lang.NoClassDefFoundError: com/amazonaws/services/s3/model/MultiObjectDeleteException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2575)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2540)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2636)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3269)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:268)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:239)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD.count(RDD.scala:1168)
... 49 elided
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.MultiObjectDeleteException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 76 more

上述问题与 IAM 策略有关。 它没有查看所需文件"/*"的策略。

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::test_bucket"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::test_bucket/*"
]
}
]
}

您创建的角色将具有上述 IAM 策略。角色将附加到 EC2 实例(主实例和从属 AWS EC2 实例(,这一点至关重要,因为 Spark 将恢复分配给 EC2 实例的角色。 因此,由于 EC2 已分配角色,因此您无需在 scala 代码中指定角色。 您需要做的就是编写以下 Scala 代码来读取一个文件,该文件将恢复分配给 EC2 实例的角色。

val myRDD = sc.textFile("s3a://test_bucket/test.csv")
myRDD.count()

Hadoop-3.2.1.tar.gz 同时具有 hadoop-aws-3.2.1.jar 和 aws-java-sdk-bundle-1.11.375.jar位于/opt/hadoop/share/hadoop/tools/lib

这是您要确保已定义指向正确 jar 目录的 spark-env.sh,以便 spark 在类路径中加载 jar。

cp /opt/spark/conf/spark-env.sh.template /opt/spark/conf/spark-env.sh
export SPARK_DIST_CLASSPATH=/opt/spark/jars:/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/contrib/capacity-scheduler/*.jar:/opt/hadoop/share/hadoop/tools/lib/*

没有选项fs.s3a.credentialsType; 对于 s3a,一切都是小写的,这有助于调试这些东西,

有关代入角色凭据的文档涵盖了所需的权限 https://hadoop.apache.org/docs/r3.1.0/hadoop-aws/tools/hadoop-aws/assumed_roles.html

这在 hadoop 3.2 上的工作方式是,必须具有完全权限的调用,然后 s3a 连接器调用 STS AssumeRole 以在给定角色中创建一些短期会话凭据。在 EC2 中,虚拟机无权调用 AssumeRole(它们已经在角色中运行(,因此您必须使用创建虚拟机时使用的任何内容。

现在,请使用 s3a 代入角色的内容来查看策略允许的角色。

相关内容

  • 没有找到相关文章