Spark SQL中联接的性能



假设我们有一个健康的集群,并且对于用例,我们有

两个数据集,具有1条Billlion+记录

我们需要比较两个数据集并找出

原始数据集中的重复

我本来打算写一篇

在要检查的列上具有联接的sql查询复制

我想知道会怎样

该查询的性能以及的改进

这可以在数据集(数据帧分区(中完成,然后再加入它们。

请把你的观察结果加进去。

我想知道的性能会如何

与什么相比?至于绝对数字,我认为这显然取决于您的数据和集群。

然而,在Spark 2.0中,性能改进是相当显著的。

以及的改进

Catalyst优化器非常好(在2.0之后更是如此(。在它的下面,它负责您的大多数优化,如列修剪、谓词下推等。(在2.0中,还有代码生成,它负责生成一个非常优化的代码,从而实现非常大的性能改进。(

无论您使用的是数据框架/数据集API还是SQL,这些相同的改进都是全面可用的。

作为Spark的催化剂所做的查询优化的一个例子,假设您有两个具有相同模式的数据帧df1和df2(根据您的情况(,并且您希望在某些列上连接它们,以仅获得交集并输出这些元组。

假设我的数据帧模式如下(调用df.schema(:

StructType(
StructField(df.id,StringType,true), 
StructField(df.age,StringType,true), 
StructField(df.city,StringType,true), 
StructField(df.name,StringType,true))

也就是说,在我的数据集中有id、age、city和name列。

现在,如果你想做什么,你会做一些类似的事情

df1.join(
    df2, 
    $"df2.name"===$"df1.name"
       ).select("df1.id","df1.name", "df1.age", "df1.city" ).show

如果你查看上面的物理计划,你会注意到Catalyst优化器在引擎盖下进行了许多优化:

== Physical Plan ==
*Project [df1.id#898, df1.name#904, df1.age#886, df1.city#892]
+- *BroadcastHashJoin [df1.name#904], [df2.name#880], Inner, BuildRight
   :- *Project [age#96 AS df1.age#886, city#97 AS df1.city#892, id#98 AS df1.id#898, name#99 AS df1.name#904]
   :  +- *Filter isnotnull(name#99)
   :     +- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *Project [name#99 AS df2.name#880]
         +- *Filter isnotnull(name#99)
            +- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string>

`

特别要注意的是,即使两个相同的数据帧被连接,它们的读取方式也不同——

  1. 谓词下推:从查询中可以明显看出,对于df2,您只需要name列(而不是具有id, age等的整个记录(。如果这些信息被推送到我的数据被读取的位置,那不是很好吗?这将使我免于阅读我不打算使用的不必要的数据。Spark就是这么做的!对于一侧的联接,Spark将只读取name列。此行:+- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string>然而,对于另一侧df1,我们希望在联接后的结果中包含所有四列。Spark再次计算出了这一点,并读取了所有四列。此行:+- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string>

  2. 同样,在阅读并加入Spark之前,您发现自己正在加入name专栏。所以在加入之前,它删除了名为null的元组。此行:+- *Filter isnotnull(name#99)

这意味着Spark已经在为您做所有这些繁重的工作,以便读取最小的数据并将其带入内存(从而减少洗牌和计算时间(。

然而,对于您的特定情况,您可能需要考虑是否可以进一步减少读取的数据——至少对于联接的一侧。如果df2中有许多行具有与df1匹配的相同键组合,该怎么办。你会不会先在df2上做一个不同的处理会更好?例如:

df1.join(
    df2.select("df2.name").distinct, 
    $"df2.name"===$"df1.name"
   ).select("df1.id","df1.name", "df1.age", "df1.city" )

这种顺序的数据集的查询性能无法预测,但可以处理。我使用了一个包含7亿条记录的数据集,下面是帮助调整我的应用程序的突出属性。

  • spark.sql.shuffle.departitions(自己寻找最佳位置(
  • spark.serializer(最好是KryoSerializer(

此外,为应用程序分配集群资源也非常重要。请参阅此博客。谢谢

您是否尝试过根据集群配置将executor核心增加到4个或更多,并且在执行spark提交时做得更好,更不用说执行器的数量了。让spark决定要使用的执行器的数量。在处理庞大的数据集时,这可能会在一定程度上提高性能。

相关内容

  • 没有找到相关文章

最新更新