我有一个largeDataFrame
(多列和数十亿行)和一个smallDataFrame
(单列和10,000行)。
当largeDataFrame
中的some_identifier
列与smallDataFrame
中的一行匹配时,我想过滤largeDataFrame
中的所有行。
下面是一个例子:
largeDataFrame
some_idenfitier,first_name
111,bob
123,phil
222,mary
456,sue
smallDataFrame
some_identifier
123
456
desiredOutput
111,bob
222,mary
这是我丑陋的解决方案。
val smallDataFrame2 = smallDataFrame.withColumn("is_bad", lit("bad_row"))
val desiredOutput = largeDataFrame.join(broadcast(smallDataFrame2), Seq("some_identifier"), "left").filter($"is_bad".isNull).drop("is_bad")
有更干净的解决方案吗?
在本例中需要使用left_anti
连接。
左反连接与左半连接相反。
根据给定的键从左表中过滤出右表中的数据:
largeDataFrame
.join(smallDataFrame, Seq("some_identifier"),"left_anti")
.show
// +---------------+----------+
// |some_identifier|first_name|
// +---------------+----------+
// | 222| mary|
// | 111| bob|
// +---------------+----------+
纯Spark SQL版本(以PySpark为例,但有一些小变化)同样适用于Scala API):
def string_to_dataframe (df_name, csv_string):
rdd = spark.sparkContext.parallelize(csv_string.split("n"))
df = spark.read.option('header', 'true').option('inferSchema','true').csv(rdd)
df.registerTempTable(df_name)
string_to_dataframe("largeDataFrame", '''some_identifier,first_name
111,bob
123,phil
222,mary
456,sue''')
string_to_dataframe("smallDataFrame", '''some_identifier
123
456
''')
anti_join_df = spark.sql("""
select *
from largeDataFrame L
where NOT EXISTS (
select 1 from smallDataFrame S
WHERE L.some_identifier = S.some_identifier
)
""")
print(anti_join_df.take(10))
anti_join_df.explain()
将输出Mary和bob:
[行(some_identifier = 222, first_name ="玛丽"),
行(some_identifier = 111, first_name ="鲍勃")]
和物理执行计划将显示它正在使用
== Physical Plan ==
SortMergeJoin [some_identifier#252], [some_identifier#264], LeftAnti
:- *(1) Sort [some_identifier#252 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(some_identifier#252, 200)
: +- Scan ExistingRDD[some_identifier#252,first_name#253]
+- *(3) Sort [some_identifier#264 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(some_identifier#264, 200)
+- *(2) Project [some_identifier#264]
+- Scan ExistingRDD[some_identifier#264]
注意Sort Merge Join
对于连接/反连接大约相同大小的数据集更有效。既然你已经提到小数据帧更小,我们应该确保Spark优化器选择Broadcast Hash Join
,这将在这种情况下更有效:
我将NOT EXISTS
改为NOT IN
子句:
anti_join_df = spark.sql("""
select *
from largeDataFrame L
where L.some_identifier NOT IN (
select S.some_identifier
from smallDataFrame S
)
""")
anti_join_df.explain()
让我们看看它给了我们什么:
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftAnti, ((some_identifier#302 = some_identifier#314) || isnull((some_identifier#302 = some_identifier#314)))
:- Scan ExistingRDD[some_identifier#302,first_name#303]
+- BroadcastExchange IdentityBroadcastMode
+- Scan ExistingRDD[some_identifier#314]
注意Spark Optimizer实际上选择了Broadcast Nested Loop Join
而不是Broadcast Hash Join
。前者是可以的,因为我们只有两条记录要从左边排除。
还要注意,两个执行计划都有LeftAnti
,所以它类似于@eliasah答案,但使用纯SQL实现。另外,它表明您可以对物理执行计划有更多的控制。
p。还要记住,如果右侧数据框比左侧数据框小得多,但比一些记录大,那么您确实希望使用Broadcast Hash Join
,而不是Broadcast Nested Loop Join
或Sort Merge Join
。如果没有发生这种情况,您可能需要调整spark.sql.autoBroadcastJoinThreshold,因为它默认为10Mb,但它必须大于"smallDataFrame"的大小。