根据指定denylist条件的另一个DataFrame筛选Spark DataFrame



我有一个largeDataFrame(多列和数十亿行)和一个smallDataFrame(单列和10,000行)。

largeDataFrame中的some_identifier列与smallDataFrame中的一行匹配时,我想过滤largeDataFrame中的所有行。

下面是一个例子:

largeDataFrame

some_idenfitier,first_name
111,bob
123,phil
222,mary
456,sue

smallDataFrame

some_identifier
123
456

desiredOutput

111,bob
222,mary

这是我丑陋的解决方案。

val smallDataFrame2 = smallDataFrame.withColumn("is_bad", lit("bad_row"))
val desiredOutput = largeDataFrame.join(broadcast(smallDataFrame2), Seq("some_identifier"), "left").filter($"is_bad".isNull).drop("is_bad")

有更干净的解决方案吗?

在本例中需要使用left_anti连接。

左反连接左半连接相反。

根据给定的键从左表中过滤出右表中的数据:

largeDataFrame
   .join(smallDataFrame, Seq("some_identifier"),"left_anti")
   .show
// +---------------+----------+
// |some_identifier|first_name|
// +---------------+----------+
// |            222|      mary|
// |            111|       bob|
// +---------------+----------+

纯Spark SQL版本(以PySpark为例,但有一些小变化)同样适用于Scala API):

def string_to_dataframe (df_name, csv_string):
    rdd = spark.sparkContext.parallelize(csv_string.split("n"))
    df = spark.read.option('header', 'true').option('inferSchema','true').csv(rdd)
    df.registerTempTable(df_name)
string_to_dataframe("largeDataFrame", '''some_identifier,first_name
111,bob
123,phil
222,mary
456,sue''')
string_to_dataframe("smallDataFrame", '''some_identifier
123
456
''')
anti_join_df = spark.sql("""
    select * 
    from largeDataFrame L
    where NOT EXISTS (
            select 1 from smallDataFrame S
            WHERE L.some_identifier = S.some_identifier
        )
""")
print(anti_join_df.take(10))
anti_join_df.explain()

将输出Mary和bob:

[行(some_identifier = 222, first_name ="玛丽"),
行(some_identifier = 111, first_name ="鲍勃")]

物理执行计划将显示它正在使用

== Physical Plan ==
SortMergeJoin [some_identifier#252], [some_identifier#264], LeftAnti
:- *(1) Sort [some_identifier#252 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(some_identifier#252, 200)
:     +- Scan ExistingRDD[some_identifier#252,first_name#253]
+- *(3) Sort [some_identifier#264 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(some_identifier#264, 200)
      +- *(2) Project [some_identifier#264]
         +- Scan ExistingRDD[some_identifier#264]

注意Sort Merge Join对于连接/反连接大约相同大小的数据集更有效。既然你已经提到小数据帧更小,我们应该确保Spark优化器选择Broadcast Hash Join,这将在这种情况下更有效:

我将NOT EXISTS改为NOT IN子句:

anti_join_df = spark.sql("""
    select * 
    from largeDataFrame L
    where L.some_identifier NOT IN (
            select S.some_identifier
            from smallDataFrame S
        )
""")
anti_join_df.explain()

让我们看看它给了我们什么:

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftAnti, ((some_identifier#302 = some_identifier#314) || isnull((some_identifier#302 = some_identifier#314)))
:- Scan ExistingRDD[some_identifier#302,first_name#303]
+- BroadcastExchange IdentityBroadcastMode
   +- Scan ExistingRDD[some_identifier#314]

注意Spark Optimizer实际上选择了Broadcast Nested Loop Join而不是Broadcast Hash Join。前者是可以的,因为我们只有两条记录要从左边排除。

还要注意,两个执行计划都有LeftAnti,所以它类似于@eliasah答案,但使用纯SQL实现。另外,它表明您可以对物理执行计划有更多的控制。

p。还要记住,如果右侧数据框比左侧数据框小得多,但比一些记录大,那么您确实希望使用Broadcast Hash Join,而不是Broadcast Nested Loop JoinSort Merge Join。如果没有发生这种情况,您可能需要调整spark.sql.autoBroadcastJoinThreshold,因为它默认为10Mb,但它必须大于"smallDataFrame"的大小。

相关内容

  • 没有找到相关文章

最新更新