使用python/dbutils,如何显示当前目录的文件&Databricks文件系统(DBFS(中递归的子目录。
dbutils.fs.ls(和%fs-magic命令(令人惊讶的是,它似乎不支持任何递归切换。然而,由于ls函数返回一个FileInfo对象列表,因此递归迭代它们以获得整个内容是非常简单的,例如:
def get_dir_content(ls_path):
dir_paths = dbutils.fs.ls(ls_path)
subdir_paths = [get_dir_content(p.path) for p in dir_paths if p.isDir() and p.path != ls_path]
flat_subdir_paths = [p for subdir in subdir_paths for p in subdir]
return list(map(lambda p: p.path, dir_paths)) + flat_subdir_paths
paths = get_dir_content('/databricks-datasets/COVID/CORD-19/2020-03-13')
[print(p) for p in paths]
可以使用生成器和yield
运算符来完成另一种实现。对于yield from
运算符,您必须至少使用Python 3.3+
,并查看此伟大的帖子以更好地理解yield
运算符:
def get_dir_content(ls_path):
for dir_path in dbutils.fs.ls(ls_path):
if dir_path.isFile():
yield dir_path.path
elif dir_path.isDir() and ls_path != dir_path.path:
yield from get_dir_content(dir_path.path)
list(get_dir_content('/databricks-datasets/COVID/CORD-19/2020-03-13'))
您也可以尝试这个递归函数:
def lsR(path):
return [
fname
for flist in [
([fi.path] if fi.isFile() else lsR(fi.path))
for fi in dbutils.fs.ls(path)
]
for fname in flist
]
lsR("/your/folder")
这里列出了其他答案,但值得注意的是,databricks将数据集存储为文件夹。
例如,您可能有一个名为my_dataset_here
的"目录",其中包含以下文件:
my_dataset_here/part-00193-111-c845-4ce6-8714-123-c000.snappy.parquet
my_dataset_here/part-00193-123-c845-4ce6-8714-123-c000.snappy.parquet
my_dataset_here/part-00193-222-c845-4ce6-8714-123-c000.snappy.parquet
my_dataset_here/part-00193-444-c845-4ce6-8714-123-c000.snappy.parquet
...
在一组典型的表中,将有数千个这样的文件。
试图枚举这样一个文件夹中的每个文件可能需要很长时间。。。例如,分钟,因为对dbutils.fs.ls
的单个调用必须返回每个结果的数组。
因此,一种天真的方法,如:
stack = ["/databricks-datasets/COVID/CORD-19/2020-03-13"]
while len(stack) > 0:
current_folder = stack.pop(0)
for file in dbutils.fs.ls(current_folder):
if file.isDir():
stack.append(file.path)
print(file.path)
else:
print(file.path)
确实会列出每个文件,但也需要很长时间才能完成。在我的测试环境中,枚举50多个表需要8分钟。
但是,如果使用新的"delta"格式,则会在delta表文件夹中创建一个名为"_delta_log"的标准命名文件夹。
因此,在尝试枚举文件夹的全部内容之前,我们可以修改代码来检查每个文件夹,看看它是否是一个数据集:
stack = ["/databricks-datasets/COVID/CORD-19/2020-03-13"]
while len(stack) > 0:
current_folder = stack.pop(0)
for file in dbutils.fs.ls(current_folder):
if file.isDir():
# Check if this is a delta table and do not recurse if so!
try:
delta_check_path = f"{file.path}/_delta_log"
dbutils.fs.ls(delta_check_path) # raises an exception if missing
print(f"dataset: {file.path}")
except:
stack.append(file.path)
print(f"folder: {file.path}")
else:
print(f"file: {file.path}")
此代码在相同的测试环境中运行38秒。
在琐碎的情况下,天真的解决方案是可以接受的,但在现实世界中,它很快就会变得完全不可接受。
请注意,此代码将仅在delta表上运行;如果你使用的是parquet/csv/任何格式,那你就倒霉了。