如何确保由 Spark 数据帧联接引起的分区



我正在重写一个Spark应用程序,以使用更多的DataFrame操作来提高效率和健壮性。但是,应用程序的一部分无法使用DataFrames完成,我必须转到RDD。剥离其基本要素,代码将如下所示:

C = A.join(B, join_key) # join_key is a string naming a column
D = C.rdd.mapPartitions(do_something)

为了正确操作,do_something要求按join_keyC.rdd进行分区。我认为情况会是这样,因为 equijoin 的工作原理是按键对数据进行分区,然后形成键值相同的对。在 Spark RDD 连接中,这些对是由分区数据的迭代器隐式形成的,除非我告诉 Spark 将迭代器"具体化"为对列表,然后对结果进行重新分区,否则这些对将无法离开定义它们的分区,我在这里没有这样做。我希望数据帧联接也是如此。

综上所述,上面的讨论并不能证明所需的分区是可以确保的。我依赖于 Spark 实现的细节,这些细节无法通过 API 保证,我不确定这是 100% 安全的。无法保证 Catalyst 优化器不会将额外的分区边界扔到共享相同密钥的一组对中,从而将其分解并使我的算法不正确。

为了确保所需的分区,我可以在应用 do_something 函数之前显式执行C.rdd.partitionBy(lambda x: x['join_key']),但我担心这可能会触发许多不必要的序列化、洗牌或其他开销。

根据这篇博文,我似乎也可以使用 HiveQL 的DISTRIBUTE BY,但同样,我不知道这可能会触发什么开销。

我的问题是:依赖连接引起的隐式分区是否安全,还是应该显式确保它?如果是这样,确保它的最有效方法是什么?我正在使用 PySpark 1.6.2。

一般来说,特定的连接机制不是合约的一部分,当有关分区的假设失败时,您可以相对轻松地构建一个合成示例。例如,在某些情况下join可以表示为不会触发重新分区的BroadcastHashJoin

from pyspark.sql.functions import broadcast
# Just so we can easily inspect the results
sqlContext.setConf("spark.sql.shuffle.partitions", 4)
a = (sc
    .parallelize([(1, "a"), (2, "b"), (3, "a"), (4, "b")],  2)
    .toDF(["id", "join_key"]))
# Lets hint optimizer that b can be broadcasted
b = broadcast(
    sc.parallelize([("a", "foo"), ("b", "bar")]).toDF(["join_key", "foobar"])
)
c = a.join(b, "join_key")
c.rdd.glom().collect()
## [[Row(join_key='a', id=1, foobar='foo'),
##  Row(join_key='b', id=2, foobar='bar')],
##  [Row(join_key='a', id=3, foobar='foo'),
##  Row(join_key='b', id=4, foobar='bar')]]

在其他一些条件下,广播加入可以在没有明确提示的情况下使用(例如参见Databricks Guide - SQL,DataFrames&Datasets/BroadcastHashJoin),并且不能保证将来不会添加一些额外的机制。

如果要确定结果,则应显式重新分区。

c.repartition("join_key").rdd.glom().collect()
## [[],
##  [Row(join_key='b', id=2, foobar='bar'),
##  Row(join_key='b', id=4, foobar='bar')],
##  [Row(join_key='a', id=1, foobar='foo'),
##   Row(join_key='a', id=3, foobar='foo')],
##  []]

这里的另一个问题是使用DataFrames提高效率和鲁棒性。如果你的逻辑在很大程度上依赖于直接在Python中访问数据(与SQL表达式相反),那么使用DataFrames传递数据几乎是一种反模式。您可以查看我对 Spark 函数与 UDF 性能的回答吗?这涵盖了类似的问题。因此,在采用此方法之前,请务必进行基准测试,因为在许多情况下,移动数据的成本很容易消耗 SQL 优化的所有好处。

相关内容

  • 没有找到相关文章

最新更新