我是Pyspark DataFrames的新手,并且以前曾与RDD一起使用。我有这样的数据框:
date path
2017-01-01 /A/B/C/D
2017-01-01 /X
2017-01-01 /X/Y
,要转换为以下内容:
date path
2017-01-01 /A/B
2017-01-01 /X
2017-01-01 /X/Y
基本上要在第三个/
之后摆脱一切。因此,在使用RDD之前,我曾经有以下内容:
from urllib import quote_plus
path_levels = df['path'].split('/')
filtered_path_levels = []
for _level in range(min(df_size, 3)):
# Take only the top 2 levels of path
filtered_path_levels.append(quote_plus(path_levels[_level]))
df['path'] = '/'.join(map(str, filtered_path_levels))
我会说,带有Pyspark的事情更为复杂。这是我到目前为止所得到的:
path_levels = split(results_df['path'], '/')
filtered_path_levels = []
for _level in range(size(df_size, 3)):
# Take only the top 2 levels of path
filtered_path_levels.append(quote_plus(path_levels[_level]))
df['path'] = '/'.join(map(str, filtered_path_levels))
这给我带来了以下错误:
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
任何帮助重新加重这将不胜感激。让我知道这是否需要更多信息/解释。
使用 udf
:
from pyspark.sql.functions import *
@udf
def quote_string_(path, size):
if path:
return "/".join(quote_plus(x) for x in path.split("/")[:size])
df.withColumn("foo", quote_string_("path", lit(2)))
我使用以下代码解决了我的问题:
from pyspark.sql.functions import split, col, lit, concat
split_col = split(df['path'], '/')
df = df.withColumn('l1_path', split_col.getItem(1))
df = df.withColumn('l2_path', split_col.getItem(2))
df = df.withColumn('path', concat(col('l1_path'), lit('/'), col('l2_path')))
df = df.drop('l1_path', 'l2_path')