我在S3存储桶中有一堆镶木地板文件。这些文件包含不同的列。我想阅读这些文件,只使用包含一些列的文件创建一个基准框架。
例如:假设我有三列";name"城市;以及";年";。我的一些文件只包含";名称和";城市";,其他包含";name"城市;以及";年";。如何通过排除不包含列"的文件来创建数据帧;年";。我正在使用spark和scala。欢迎任何帮助。
如何通过排除不包含列"的文件来创建数据帧;年";。
首先,我建议重新构建bucket,根据它们的模式来分离这些文件,或者更好的是有一个转换这些"文件"的过程;生的";文件转换为一个更易于使用的通用模式。
使用您所拥有的,从一些镶木地板文件开始:
val df1 = List(
("a", "b", "c")
).toDF("name", "city", "years")
df1.write.parquet("s3://{bucket}/test/a.parquet")
val df2 = List(
("aa", "bb")
).toDF("name", "city")
df2.write.parquet("s3://{bucket}/test/b.parquet")
val df3 = List(
("aaa", "bbb", "ccc")
).toDF("name", "city", "year")
df3.write.parquet("s3://{bucket}/test/c.parquet")
我们可以做到:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, input_file_name}
// determine the s3 paths of all the parquet files, to be read independantly
val s3Paths = spark
.read
.schema(schema)
.parquet("s3://{bucket}/test/")
.withColumn("filename", input_file_name())
.select("filename")
.collect()
.map(_(0).toString)
// individual DataFrames of each parquet file, where the `year` is not present
val dfs = s3Paths.flatMap {
path =>
val df = spark.read.parquet(path)
if (!df.columns.contains("year")) {
List(df)
} else {
List.empty[DataFrame]
}
}
// take the first DataFrame, and the rest
val (firstDFs, otherDFs) = (dfs.head, dfs.tail)
// combine all of the DataFrame, unioning the rows
otherDFs.foldLeft(firstDFs) {
case (acc, df) => acc.unionByName(df, allowMissingColumns = true)
}.show()
注:
在上面的例子中,当创建测试数据时:
df1.write.parquet("s3://{bucket}/test/a.parquet")
将在s3中创建一个文件,例如:
s3://{bucket}/test/a.parquet/part-0000-blah.parquet
为了创建这个示例,我将镶木地板文件向上移动到test/
路径中。