在PySpark数据框架中,我想将字符串完整文件路径转换为每个父路径的多行。
输入数据帧值:
ParentFolder/Folder1/Folder2/Folder3/Folder4/TestFile.txt
输出:每行都应该显示一个绝对路径以及/
分隔符
ParentFolder/
ParentFolder/Folder1/
ParentFolder/Folder1/Folder2/
ParentFolder/Folder1/Folder2/Folder3/
ParentFolder/Folder1/Folder2/Folder3/Folder4/
ParentFolder/Folder1/Folder2/Folder3/Folder4/TestFile.txt
可以这样使用substring_index
:
df2 = df.selectExpr("""
explode(
transform(
sequence(1, size(split(col, '/'))),
(x, i) -> case when i = size(split(col, '/')) - 1
then col
else substring_index(col, '/', x) || '/'
end
)
) as col
""")
df2.show(20,0)
+---------------------------------------------------------+
|col |
+---------------------------------------------------------+
|ParentFolder/ |
|ParentFolder/Folder1/ |
|ParentFolder/Folder1/Folder2/ |
|ParentFolder/Folder1/Folder2/Folder3/ |
|ParentFolder/Folder1/Folder2/Folder3/Folder4/ |
|ParentFolder/Folder1/Folder2/Folder3/Folder4/TestFile.txt|
+---------------------------------------------------------+
您可以用/
分隔符拆分列value
以获得路径的所有部分。然后在拆分结果上使用transform
函数,使用slice
和array_join
函数构建不同的父路径:
from pyspark.sql import functions as F
df1 = df.withColumn("value", F.split(F.col("value"), "/"))
.selectExpr("""
explode(
transform(value,
(x, i) -> struct(i+1 as rn, array_join(slice(value, 1, i+1), '/') ||
IF(i+1 < size(value), '/', '') as path)
)
) as paths
""").select("paths.*")
df1.show(truncate=False)
#+---+---------------------------------------------------------+
#|rn |path |
#+---+---------------------------------------------------------+
#|1 |ParentFolder/ |
#|2 |ParentFolder/Folder1/ |
#|3 |ParentFolder/Folder1/Folder2/ |
#|4 |ParentFolder/Folder1/Folder2/Folder3/ |
#|5 |ParentFolder/Folder1/Folder2/Folder3/Folder4/ |
#|6 |ParentFolder/Folder1/Folder2/Folder3/Folder4/TestFile.txt|
#+---+---------------------------------------------------------+
ForSpark <2.4,您可以像这样使用UDF:
import os
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType
def get_all_paths(path: str):
paths = [path]
for _ in range(path.count("/")):
path, base = os.path.split(path)
paths.append(path + "/")
return list(reversed(paths))
decompose_path = F.udf(get_all_paths, ArrayType(StringType()))
df1 = df.select(F.explode(decompose_path(F.col("value"))).alias("paths"))