在Databricks(DBFS)中递归地列出目录和子目录的文件



使用python/dbutils,如何显示当前目录的文件&Databricks文件系统(DBFS(中递归的子目录。

dbutils.fs.ls(和%fs-magic命令(令人惊讶的是,它似乎不支持任何递归切换。然而,由于ls函数返回一个FileInfo对象列表,因此递归迭代它们以获得整个内容是非常简单的,例如:

def get_dir_content(ls_path):
dir_paths = dbutils.fs.ls(ls_path)
subdir_paths = [get_dir_content(p.path) for p in dir_paths if p.isDir() and p.path != ls_path]
flat_subdir_paths = [p for subdir in subdir_paths for p in subdir]
return list(map(lambda p: p.path, dir_paths)) + flat_subdir_paths

paths = get_dir_content('/databricks-datasets/COVID/CORD-19/2020-03-13')
[print(p) for p in paths]

可以使用生成器和yield运算符来完成另一种实现。对于yield from运算符,您必须至少使用Python 3.3+,并查看此伟大的帖子以更好地理解yield运算符:

def get_dir_content(ls_path):
for dir_path in dbutils.fs.ls(ls_path):
if dir_path.isFile():
yield dir_path.path
elif dir_path.isDir() and ls_path != dir_path.path:
yield from get_dir_content(dir_path.path)

list(get_dir_content('/databricks-datasets/COVID/CORD-19/2020-03-13'))

您也可以尝试这个递归函数:

def lsR(path):
return [
fname
for flist in [
([fi.path] if fi.isFile() else lsR(fi.path))
for fi in dbutils.fs.ls(path)
]
for fname in flist
]

lsR("/your/folder")

这里列出了其他答案,但值得注意的是,databricks将数据集存储为文件夹。

例如,您可能有一个名为my_dataset_here的"目录",其中包含以下文件:

my_dataset_here/part-00193-111-c845-4ce6-8714-123-c000.snappy.parquet
my_dataset_here/part-00193-123-c845-4ce6-8714-123-c000.snappy.parquet
my_dataset_here/part-00193-222-c845-4ce6-8714-123-c000.snappy.parquet
my_dataset_here/part-00193-444-c845-4ce6-8714-123-c000.snappy.parquet
...

在一组典型的表中,将有数千个这样的文件

试图枚举这样一个文件夹中的每个文件可能需要很长时间。。。例如,分钟,因为对dbutils.fs.ls的单个调用必须返回每个结果的数组。

因此,一种天真的方法,如:

stack = ["/databricks-datasets/COVID/CORD-19/2020-03-13"]
while len(stack) > 0:
current_folder = stack.pop(0)
for file in dbutils.fs.ls(current_folder):
if file.isDir():
stack.append(file.path)
print(file.path)
else:
print(file.path)

确实会列出每个文件,但也需要很长时间才能完成。在我的测试环境中,枚举50多个表需要8分钟

但是,如果使用新的"delta"格式,则会在delta表文件夹中创建一个名为"_delta_log"的标准命名文件夹。

因此,在尝试枚举文件夹的全部内容之前,我们可以修改代码来检查每个文件夹,看看它是否是一个数据集:

stack = ["/databricks-datasets/COVID/CORD-19/2020-03-13"]
while len(stack) > 0:
current_folder = stack.pop(0)
for file in dbutils.fs.ls(current_folder):
if file.isDir():
# Check if this is a delta table and do not recurse if so!
try:
delta_check_path = f"{file.path}/_delta_log"
dbutils.fs.ls(delta_check_path)  # raises an exception if missing
print(f"dataset: {file.path}")
except:            
stack.append(file.path)
print(f"folder: {file.path}")
else:
print(f"file: {file.path}")

此代码在相同的测试环境中运行38秒

在琐碎的情况下,天真的解决方案是可以接受的,但在现实世界中,它很快就会变得完全不可接受。

请注意,此代码将仅在delta表上运行;如果你使用的是parquet/csv/任何格式,那你就倒霉了。

最新更新