广播联接中的左表



这是我的加入:

df = df_small.join(df_big, 'id', 'leftanti')

我似乎只能广播正确的数据帧。但是为了使我的逻辑工作(leftanti-join(,我必须将df_small放在左边。

如何广播左边的数据帧?


示例:

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df_small = spark.range(2)
df_big = spark.range(1, 5000000)
#    df_small     df_big
#    +---+        +-------+
#    | id|        |     id|
#    +---+        +-------+
#    |  0|        |      1|
#    |  1|        |      2|
#    +---+        |    ...|
#                 |4999999|
#                 +-------+
df_small = F.broadcast(df_small)
df = df_small.join(df_big, 'id', 'leftanti')
df.show()
df.explain()
#    +---+
#    | id|
#    +---+
#    |  0|
#    +---+
#
#    == Physical Plan ==
#    AdaptiveSparkPlan isFinalPlan=false
#    +- SortMergeJoin [id#197L], [id#199L], LeftAnti
#       :- Sort [id#197L ASC NULLS FIRST], false, 0
#       :  +- Exchange hashpartitioning(id#197L, 200), ENSURE_REQUIREMENTS, [id=#1406]
#       :     +- Range (0, 2, step=1, splits=2)
#       +- Sort [id#199L ASC NULLS FIRST], false, 0
#          +- Exchange hashpartitioning(id#199L, 200), ENSURE_REQUIREMENTS, [id=#1407]
#             +- Range (1, 5000000, step=1, splits=2)

不幸的是,这是不可能的。

Spark只能为右外联接广播左侧表。

通过将左anti划分为2个连接,即内部连接和左连接,可以获得所需的结果。

df1 = spark.createDataFrame([1, 2, 3, 4, 5], IntegerType())
df2 = spark.createDataFrame([(1, 'a'), (2, 'b')], ['value', 'col'])
inner = df1.join(broadcast(df2), 'value', 'inner')
out = df1.join(broadcast(inner), 'value', 'left').where(col('col').isNull()).drop('col')
out.show()
+-----+
|value|
+-----+
|    3|
|    4|
|    5|
+-----+
df1.join(df2, 'value', 'left_anti').show()
+-----+
|value|
+-----+
|    5|
|    3|
|    4|
+-----+

基于这个想法,我创建了一个函数。

功能:

df_inner = df_big.join(broadcast(df_small), 'id', 'inner').select('id').distinct()
df_out = df_small.join(broadcast(df_inner), 'id', 'leftanti')

使用功能:

def leftanti_on_small(df_small, df_big, on):
df_inner = df_big.join(broadcast(df_small), on, 'inner').select(on).distinct()
return df_small.join(broadcast(df_inner), on, 'leftanti')
df_out = leftanti_on_small(df_small, df_big, on='id')

结果:

df_out.show()
df_out.explain()
#  +---+
#  | id|
#  +---+
#  |  0|
#  +---+
#  
#  == Physical Plan ==
#  AdaptiveSparkPlan isFinalPlan=false
#  +- BroadcastHashJoin [id#209L], [id#211L], LeftAnti, BuildRight, false
#     :- Range (0, 2, step=1, splits=2)
#     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#1587]
#        +- HashAggregate(keys=[id#211L], functions=[])
#           +- HashAggregate(keys=[id#211L], functions=[])
#              +- Project [id#211L]
#                 +- BroadcastHashJoin [id#211L], [id#217L], Inner, BuildRight, false
#                    :- Range (1, 5000000, step=1, splits=2)
#                    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#1581]
#                       +- Range (0, 2, step=1, splits=2)

相关内容

  • 没有找到相关文章

最新更新