将PySpark中的完整文件路径转换为父绝对路径的多行



在PySpark数据框架中,我想将字符串完整文件路径转换为每个父路径的多行。

输入数据帧值:

ParentFolder/Folder1/Folder2/Folder3/Folder4/TestFile.txt

输出:每行都应该显示一个绝对路径以及/分隔符

ParentFolder/
ParentFolder/Folder1/
ParentFolder/Folder1/Folder2/
ParentFolder/Folder1/Folder2/Folder3/
ParentFolder/Folder1/Folder2/Folder3/Folder4/
ParentFolder/Folder1/Folder2/Folder3/Folder4/TestFile.txt

可以这样使用substring_index:

df2 = df.selectExpr("""
explode(
transform(
sequence(1, size(split(col, '/'))),
(x, i) -> case when i = size(split(col, '/')) - 1
then col
else substring_index(col, '/', x) || '/'
end
)
) as col
""")
df2.show(20,0)
+---------------------------------------------------------+
|col                                                      |
+---------------------------------------------------------+
|ParentFolder/                                            |
|ParentFolder/Folder1/                                    |
|ParentFolder/Folder1/Folder2/                            |
|ParentFolder/Folder1/Folder2/Folder3/                    |
|ParentFolder/Folder1/Folder2/Folder3/Folder4/            |
|ParentFolder/Folder1/Folder2/Folder3/Folder4/TestFile.txt|
+---------------------------------------------------------+

您可以用/分隔符拆分列value以获得路径的所有部分。然后在拆分结果上使用transform函数,使用slicearray_join函数构建不同的父路径:

from pyspark.sql import functions as F
df1 = df.withColumn("value", F.split(F.col("value"), "/")) 
.selectExpr("""
explode(
transform(value, 
(x, i) -> struct(i+1 as rn, array_join(slice(value, 1, i+1), '/') ||
IF(i+1 < size(value), '/', '') as path)
)
) as paths
""").select("paths.*")

df1.show(truncate=False)
#+---+---------------------------------------------------------+
#|rn |path                                                     |
#+---+---------------------------------------------------------+
#|1  |ParentFolder/                                            |
#|2  |ParentFolder/Folder1/                                    |
#|3  |ParentFolder/Folder1/Folder2/                            |
#|4  |ParentFolder/Folder1/Folder2/Folder3/                    |
#|5  |ParentFolder/Folder1/Folder2/Folder3/Folder4/            |
#|6  |ParentFolder/Folder1/Folder2/Folder3/Folder4/TestFile.txt|
#+---+---------------------------------------------------------+

ForSpark <2.4,您可以像这样使用UDF:

import os
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType

def get_all_paths(path: str):
paths = [path]
for _ in range(path.count("/")):
path, base = os.path.split(path)
paths.append(path + "/")
return list(reversed(paths))

decompose_path = F.udf(get_all_paths, ArrayType(StringType()))
df1 = df.select(F.explode(decompose_path(F.col("value"))).alias("paths"))

最新更新