如何在Databricks中迭代以读取存储在数据湖中不同子目录中的数百个文件?



我必须从 Azure Data Lake Gen2 读取 Databricks 中的数百个 avro 文件,从每个文件内的 Body 字段中提取数据,并在唯一的数据帧中连接所有提取的数据。关键是所有要读取的 avro 文件都存储在湖中的不同子目录中,遵循以下模式:

root/YYYY/MM/DD/HH/mm/ss.avro

这迫使我循环引入和选择数据。我正在使用这个 Python 代码,其中list_avro_files是所有文件的路径列表:

list_data = []
for file_avro in list_avro_files:
df = spark.read.format('avro').load(file_avro)
data1 = spark.read.json(df.select(df.Body.cast('string')).rdd.map(lambda x: x[0]))
list_data.append(data1)
data = reduce(DataFrame.unionAll, list_data)

有没有办法更有效地做到这一点?如何并行化/加快此过程?

只要你的list_avro_files可以通过标准的通配符语法来表达,你可能可以使用Spark自己的能力来并行化读取操作。您所需要的只是为 avro 文件指定basepath和文件名模式:

scala> var df = spark.read
.option("basepath","/user/hive/warehouse/root")
.format("avro")
.load("/user/hive/warehouse/root/*/*/*/*.avro")

而且,如果您发现需要确切知道任何给定行来自哪个文件,请使用内置函数来丰富input_file_name()数据帧:

scala> df = df.withColumn("source",input_file_name())

最新更新