我们将在 AWS 上运行一个 EMR 集群(带有竞价型实例),运行在 S3 存储桶之上。数据将以 ORC 格式存储在此存储桶中。但是,我们希望使用 R 以及某种沙盒环境,读取相同的数据。
我已经让软件包 aws.s3(cloudyr)正常运行:我可以毫无问题地读取 csv 文件,但它似乎不允许我将 orc 文件转换为可读的内容。
我在网上喜欢的两个选项是 - 斯帕克· - 数据连接器(Vertica)
由于在Windows机器上安装dataconnector是很麻烦的,我安装了SparkR,现在我能够读取本地orc.file(我的机器上的R本地,我的机器上的orc文件本地)。但是,如果我尝试 read.orc,默认情况下它会规范化我到本地路径的路径。深入研究源代码,我运行了以下内容:
sparkSession <- SparkR:::getSparkSession()
options <- SparkR:::varargsToStrEnv()
read <- SparkR:::callJMethod(sparkSession, "read")
read <- SparkR:::callJMethod(read, "options", options)
sdf <- SparkR:::handledCallJMethod(read, "orc", my_path)
但是我得到了以下错误:
Error: Error in orc : java.io.IOException: No FileSystem for scheme: https
有人可以帮助我解决这个问题或指出从 S3 加载 orc 文件的替代方法吗?
编辑答案:现在您可以直接从 S3 读取,而不是先从本地文件系统下载和读取
应约瑟夫先生的要求:通过SparkR的可能解决方案(首先我不想这样做)。
# Set the System environment variable to where Spark is installed
Sys.setenv(SPARK_HOME="pathToSpark")
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "org.apache.hadoop:hadoop-aws:2.7.1" "sparkr-shell"')
# Set the library path to include path to SparkR
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R","lib"), .libPaths()))
# Set system environments to be able to load from S3
Sys.setenv("AWS_ACCESS_KEY_ID" = "myKeyID", "AWS_SECRET_ACCESS_KEY" = "myKey", "AWS_DEFAULT_REGION" = "myRegion")
# load required packages
library(aws.s3)
library(SparkR)
## Create a spark context and a sql context
sc<-sparkR.init(master = "local")
sqlContext<-sparkRSQL.init(sc)
# Set path to file
path <- "s3n://bucketname/filename.orc"
# Set hadoop configuration
hConf = SparkR:::callJMethod(sc, "hadoopConfiguration")
SparkR:::callJMethod(hConf, "set", "fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
SparkR:::callJMethod(hConf, "set", "fs.s3n.awsAccessKeyId", "myAccesKey")
SparkR:::callJMethod(hConf, "set", "fs.s3n.awsSecretAccessKey", "mySecrectKey")
# Slight adaptation to read.orc function
sparkSession <- SparkR:::getSparkSession()
options <- SparkR:::varargsToStrEnv()
# Not required: path <- normalizePath(path)
read <- SparkR:::callJMethod(sparkSession, "read")
read <- SparkR:::callJMethod(read, "options", options)
sdf <- SparkR:::handledCallJMethod(read, "orc", path)
temp <- SparkR:::dataFrame(sdf)
# Read first lines
head(temp)