这是我的加入:
df = df_small.join(df_big, 'id', 'leftanti')
我似乎只能广播正确的数据帧。但是为了使我的逻辑工作(leftanti-join(,我必须将df_small
放在左边。
如何广播左边的数据帧?
示例:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df_small = spark.range(2)
df_big = spark.range(1, 5000000)
# df_small df_big
# +---+ +-------+
# | id| | id|
# +---+ +-------+
# | 0| | 1|
# | 1| | 2|
# +---+ | ...|
# |4999999|
# +-------+
df_small = F.broadcast(df_small)
df = df_small.join(df_big, 'id', 'leftanti')
df.show()
df.explain()
# +---+
# | id|
# +---+
# | 0|
# +---+
#
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- SortMergeJoin [id#197L], [id#199L], LeftAnti
# :- Sort [id#197L ASC NULLS FIRST], false, 0
# : +- Exchange hashpartitioning(id#197L, 200), ENSURE_REQUIREMENTS, [id=#1406]
# : +- Range (0, 2, step=1, splits=2)
# +- Sort [id#199L ASC NULLS FIRST], false, 0
# +- Exchange hashpartitioning(id#199L, 200), ENSURE_REQUIREMENTS, [id=#1407]
# +- Range (1, 5000000, step=1, splits=2)
不幸的是,这是不可能的。
Spark只能为右外联接广播左侧表。
通过将左anti划分为2个连接,即内部连接和左连接,可以获得所需的结果。
df1 = spark.createDataFrame([1, 2, 3, 4, 5], IntegerType())
df2 = spark.createDataFrame([(1, 'a'), (2, 'b')], ['value', 'col'])
inner = df1.join(broadcast(df2), 'value', 'inner')
out = df1.join(broadcast(inner), 'value', 'left').where(col('col').isNull()).drop('col')
out.show()
+-----+
|value|
+-----+
| 3|
| 4|
| 5|
+-----+
df1.join(df2, 'value', 'left_anti').show()
+-----+
|value|
+-----+
| 5|
| 3|
| 4|
+-----+
基于这个想法,我创建了一个函数。
无功能:
df_inner = df_big.join(broadcast(df_small), 'id', 'inner').select('id').distinct()
df_out = df_small.join(broadcast(df_inner), 'id', 'leftanti')
使用功能:
def leftanti_on_small(df_small, df_big, on):
df_inner = df_big.join(broadcast(df_small), on, 'inner').select(on).distinct()
return df_small.join(broadcast(df_inner), on, 'leftanti')
df_out = leftanti_on_small(df_small, df_big, on='id')
结果:
df_out.show()
df_out.explain()
# +---+
# | id|
# +---+
# | 0|
# +---+
#
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- BroadcastHashJoin [id#209L], [id#211L], LeftAnti, BuildRight, false
# :- Range (0, 2, step=1, splits=2)
# +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#1587]
# +- HashAggregate(keys=[id#211L], functions=[])
# +- HashAggregate(keys=[id#211L], functions=[])
# +- Project [id#211L]
# +- BroadcastHashJoin [id#211L], [id#217L], Inner, BuildRight, false
# :- Range (1, 5000000, step=1, splits=2)
# +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#1581]
# +- Range (0, 2, step=1, splits=2)