我将输入和输出文件夹作为参数传递给mapreduce网页上的字数统计程序。
进入以下错误:
HTTP状态500 -请求处理失败;嵌套异常是java.lang.IllegalArgumentException: AWS Access Key ID和Secret访问密钥必须指定为用户名或密码(分别)的s3n URL,或通过设置fs.s3n.awsAccessKeyId或fs.s3n。awsSecretAccessKey属性。
文档的格式为:http://wiki.apache.org/hadoop/AmazonS3
s3n://ID:SECRET@BUCKET/Path
我建议你这样做:
hadoop distcp
-Dfs.s3n.awsAccessKeyId=<your_access_id>
-Dfs.s3n.awsSecretAccessKey=<your_access_key>
s3n://origin hdfs://destinations
它也可以作为在键中出现斜杠的解决方案。带有id和访问键的参数必须完全按照以下顺序提供:disctcp之后,origin
出于安全考虑,通常不建议将AWS凭据作为Amazon s3n url的一部分传入。特别是如果代码被推送到存储库持有服务(如github)。理想情况下,在conf/core-site.xml中设置凭证:
<configuration>
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>XXXXXX</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>XXXXXX</value>
</property>
</configuration>
或重新安装awscli。
pip install awscli
对于初学者:
做准备从https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws下载jar
,放入spark jars文件夹
那么你可以
1。Hadoop配置文件
core-site.xml
export AWS_ACCESS_KEY_ID=<access-key>
export AWS_SECRET_ACCESS_KEY=<secret-key>
<configuration>
<property>
<name>fs.s3n.impl</name>
<value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3.S3FileSystem</value>
</property>
</configuration>
2。pyspark配置
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
if __name__ == "__main__":
"""
Usage: S3 sample
"""
access_key = '<access-key>'
secret_key = '<secret-key>'
spark = SparkSession
.builder
.appName("Demo")
.getOrCreate()
sc = spark.sparkContext
# remove this block if use core-site.xml and env variable
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")
# fetch from s3, returns RDD
csv_rdd = spark.sparkContext.textFile("s3n://<bucket-name>/path/to/file.csv")
c = csv_rdd.count()
print("~~~~~~~~~~~~~~~~~~~~~count~~~~~~~~~~~~~~~~~~~~~")
print(c)
spark.stop()
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
if __name__ == "__main__":
"""
Usage: S3 sample
"""
access_key = '<access-key>'
secret_key = '<secret-key>'
spark = SparkSession
.builder
.appName("Demo")
.getOrCreate()
sc = spark.sparkContext
# remove this block if use core-site.xml and env variable
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")
# fetch from s3, returns RDD
csv_rdd = spark.sparkContext.textFile("s3n://<bucket-name>/path/to/file.csv")
c = csv_rdd.count()
print("~~~~~~~~~~~~~~~~~~~~~count~~~~~~~~~~~~~~~~~~~~~")
print(c)
spark.stop()
创建文件core-site.xml
并将其放在类路径中。在文件中指定
<configuration>
<property>
<name>fs.s3.awsAccessKeyId</name>
<value>your aws access key id</value>
<description>
aws s3 key id
</description>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<value>your aws access key</value>
<description>
aws s3 key
</description>
</property>
</configuration>
Hadoop默认指定两个资源,按顺序从类路径加载:
-
core-default.xml
: hadoop 默认为只读 -
core-site.xml
:针对给定hadoop的站点特定配置安装
在s3 URI中将s3更改为s3n
hadoop distcp
-Dfs.s3a.access.key=<....>
-Dfs.s3a.secret.key=<....>
-Dfs.s3a.fast.upload=true
-update
s3a://path to file/ hdfs:///path/