我正在尝试使用列表过滤pyspark中的数据框。我想根据列表进行过滤,或者只包含列表中有值的记录。下面的代码不起作用:
# define a dataframe
rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])
# define a list of scores
l = [10,18,20]
# filter out records by scores by list l
records = df.filter(df.score in l)
# expected: (0,1), (0,1), (0,2), (1,2)
# include only records with these scores in list l
records = df.where(df.score in l)
# expected: (1,10), (1,20), (3,18), (3,18), (3,18)
给出以下错误:ValueError:无法将列转换为bool:在构建DataFrame布尔表达式时,请使用'&'表示'and', '|'表示'or', '~'表示'not'。
显示的是"df。得分在l"不能求值,因为df。Score给出了一个列和"in"。未在该列类型上定义,请使用"isin"
代码应该像这样:
# define a dataframe
rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])
# define a list of scores
l = [10,18,20]
# filter out records by scores by list l
records = df.filter(~df.score.isin(l))
# expected: (0,1), (0,1), (0,2), (1,2)
# include only records with these scores in list l
df.filter(df.score.isin(l))
# expected: (1,10), (1,20), (3,18), (3,18), (3,18)
注意where()
是filter()
的别名,所以两者是可互换的。
基于@user3133475的答案,也可以像这样从col()
调用isin()
函数:
from pyspark.sql.functions import col
l = [10,18,20]
df.filter(col("score").isin(l))
对于大数据帧,我发现join
的实现比where
要快得多:
def filter_spark_dataframe_by_list(df, column_name, filter_list):
""" Returns subset of df where df[column_name] is in filter_list """
spark = SparkSession.builder.getOrCreate()
filter_df = spark.createDataFrame(filter_list, df.schema[column_name].dataType)
return df.join(filter_df, df[column_name] == filter_df["value"])