如何读取镶木地板文件并只保留包含某些列的文件



我在S3存储桶中有一堆镶木地板文件。这些文件包含不同的列。我想阅读这些文件,只使用包含一些列的文件创建一个基准框架。

例如:假设我有三列";name"城市;以及";年";。我的一些文件只包含";名称和";城市";,其他包含";name"城市;以及";年";。如何通过排除不包含列"的文件来创建数据帧;年";。我正在使用spark和scala。欢迎任何帮助。

如何通过排除不包含列"的文件来创建数据帧;年";。

首先,我建议重新构建bucket,根据它们的模式来分离这些文件,或者更好的是有一个转换这些"文件"的过程;生的";文件转换为一个更易于使用的通用模式。


使用您所拥有的,从一些镶木地板文件开始:

val df1 = List(
("a", "b", "c")
).toDF("name", "city", "years")
df1.write.parquet("s3://{bucket}/test/a.parquet")
val df2 = List(
("aa", "bb")
).toDF("name", "city")
df2.write.parquet("s3://{bucket}/test/b.parquet")
val df3 = List(
("aaa", "bbb", "ccc")
).toDF("name", "city", "year")
df3.write.parquet("s3://{bucket}/test/c.parquet")

我们可以做到:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, input_file_name}
// determine the s3 paths of all the parquet files, to be read independantly
val s3Paths = spark
.read
.schema(schema)
.parquet("s3://{bucket}/test/")
.withColumn("filename", input_file_name())
.select("filename")
.collect()
.map(_(0).toString)
// individual DataFrames of each parquet file, where the `year` is not present
val dfs = s3Paths.flatMap {
path => 
val df = spark.read.parquet(path)
if (!df.columns.contains("year")) {
List(df)
} else {
List.empty[DataFrame]
}
}
// take the first DataFrame, and the rest
val (firstDFs, otherDFs) = (dfs.head, dfs.tail)
// combine all of the DataFrame, unioning the rows
otherDFs.foldLeft(firstDFs) {
case (acc, df) => acc.unionByName(df, allowMissingColumns = true)
}.show()

注:

在上面的例子中,当创建测试数据时:

df1.write.parquet("s3://{bucket}/test/a.parquet")

将在s3中创建一个文件,例如:

s3://{bucket}/test/a.parquet/part-0000-blah.parquet

为了创建这个示例,我将镶木地板文件向上移动到test/路径中。

相关内容

最新更新