使用 SQL 类 IN 子句过滤 Pyspark 数据帧



我想用类似SQL的IN子句过滤Pyspark DataFrame,如

sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')

其中a是元组(1, 2, 3)。我收到此错误:

java.lang.RuntimeException: [1.67] 失败: "("预期,但找到标识符 a

这基本上是说它期待类似"(1,2,3)"而不是A的东西。问题是我无法手动将值写入 a 中,因为它是从另一个作业中提取的。

在这种情况下,我将如何过滤?

在 SQL 环境范围内评估SQLContext传递给它的字符串。它不会捕获闭包。如果你想传递一个变量,你必须使用字符串格式显式地这样做:

df = sc.parallelize([(1, "foo"), (2, "x"), (3, "bar")]).toDF(("k", "v"))
df.registerTempTable("df")
sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count()
##  2 

显然,出于安全考虑,这不是您在"真正的"SQL环境中使用的东西,但在这里无关紧要。

实际上DataFrame当您想要创建动态查询时,DSL 是一个更好的选择:

from pyspark.sql.functions import col
df.where(col("v").isin({"foo", "bar"})).count()
## 2

它很容易构建和组合,并为您处理HiveQL/Spark SQL的所有细节。

重申@zero323上面提到的内容:我们也可以使用列表(不仅是set)做同样的事情,如下所示

from pyspark.sql.functions import col
df.where(col("v").isin(["foo", "bar"])).count()

只是一点点添加/更新:

choice_list = ["foo", "bar", "jack", "joan"]

如果要筛选数据帧"df",以便仅choice_list保留基于列"v"的行,则

from pyspark.sql.functions import col
df_filtered = df.where( ( col("v").isin (choice_list) ) )

您也可以对整数列执行此操作:

df_filtered = df.filter("field1 in (1,2,3)")

或者对于字符串列:

df_filtered = df.filter("field1 in ('a','b','c')")

一种对我有用的稍微不同的方法是使用自定义过滤器功能进行过滤。

def filter_func(a):
"""wrapper function to pass a in udf"""
    def filter_func_(col):
    """filtering function"""
        if col in a.value:
            return True
    return False
return udf(filter_func_, BooleanType())
# Broadcasting allows to pass large variables efficiently
a = sc.broadcast((1, 2, 3))
df = my_df.filter(filter_func(a)(col('field1'))) 
from pyspark.sql import SparkSession
import pandas as pd
spark=SparkSession.builder.appName('Practise').getOrCreate()
df_pyspark=spark.read.csv('datasets/myData.csv',header=True,inferSchema=True)
df_spark.createOrReplaceTempView("df") # we need to create a Temp table first
spark.sql("SELECT * FROM df where Departments in ('IOT','Big Data') order by Departments").show()

最新更新