并行加载 S3 文件



我通过以下代码成功地将文件从 S3 加载到 Spark 中。它正在工作,但是我注意到 1 个文件和另一个文件之间存在延迟,并且它们是按顺序加载的。我想通过并行加载来改善这一点。

// Load files that were loaded into firehose on this day
var s3Files = spark.sqlContext.read.schema(schema).json("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").rdd
// Apply the schema to the RDD, here we will have duplicates
val usersDataFrame = spark.createDataFrame(s3Files, schema)
usersDataFrame.createOrReplaceTempView("results")
// Clean and use partition by the keys to eliminate duplicates and get latest record
var results = spark.sql(buildCleaningQuery(job, "results"))
results.createOrReplaceTempView("filteredResults")
val records = spark.sql("select count(*) from filteredResults")

我也尝试过通过textFile((方法加载,但是我在将RDD[String]转换为RDD[Row]时遇到了问题,因为之后我需要继续使用Spark SQL。我以以下方式使用它;

var s3Files = sparkContext.textFile("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").toJavaRDD()

将JSON文件(每个文件约50MB的多个文件(加载到Spark的理想方式是什么?我想根据架构验证属性,以便稍后能够 Spark SQL 查询来清理数据。

正在发生的事情是DataFrame被转换为RDD,然后再次转换为DataFrame,然后丢失分区信息。

var s3Files = spark
.sqlContext
.read.schema(schema)
.json(...)
.createOrRepla‌​ceTempView("results"‌​)

应该足够,并且分区信息应该仍然存在,允许同时加载 JSON 文件。

最新更新