假设我们有一个健康的集群,并且对于用例,我们有
两个数据集,具有1条Billlion+记录
我们需要比较两个数据集并找出
原始数据集中的重复
我本来打算写一篇
在要检查的列上具有联接的sql查询复制
我想知道会怎样
该查询的性能以及的改进
这可以在数据集(数据帧分区(中完成,然后再加入它们。
请把你的观察结果加进去。
我想知道的性能会如何
与什么相比?至于绝对数字,我认为这显然取决于您的数据和集群。
然而,在Spark 2.0中,性能改进是相当显著的。
以及的改进
Catalyst优化器非常好(在2.0之后更是如此(。在它的下面,它负责您的大多数优化,如列修剪、谓词下推等。(在2.0中,还有代码生成,它负责生成一个非常优化的代码,从而实现非常大的性能改进。(
无论您使用的是数据框架/数据集API还是SQL,这些相同的改进都是全面可用的。
作为Spark的催化剂所做的查询优化的一个例子,假设您有两个具有相同模式的数据帧df1和df2(根据您的情况(,并且您希望在某些列上连接它们,以仅获得交集并输出这些元组。
假设我的数据帧模式如下(调用df.schema
(:
StructType(
StructField(df.id,StringType,true),
StructField(df.age,StringType,true),
StructField(df.city,StringType,true),
StructField(df.name,StringType,true))
也就是说,在我的数据集中有id、age、city和name列。
现在,如果你想做什么,你会做一些类似的事情
df1.join(
df2,
$"df2.name"===$"df1.name"
).select("df1.id","df1.name", "df1.age", "df1.city" ).show
如果你查看上面的物理计划,你会注意到Catalyst优化器在引擎盖下进行了许多优化:
== Physical Plan ==
*Project [df1.id#898, df1.name#904, df1.age#886, df1.city#892]
+- *BroadcastHashJoin [df1.name#904], [df2.name#880], Inner, BuildRight
:- *Project [age#96 AS df1.age#886, city#97 AS df1.city#892, id#98 AS df1.id#898, name#99 AS df1.name#904]
: +- *Filter isnotnull(name#99)
: +- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [name#99 AS df2.name#880]
+- *Filter isnotnull(name#99)
+- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string>
`
特别要注意的是,即使两个相同的数据帧被连接,它们的读取方式也不同——
谓词下推:从查询中可以明显看出,对于df2,您只需要
name
列(而不是具有id, age
等的整个记录(。如果这些信息被推送到我的数据被读取的位置,那不是很好吗?这将使我免于阅读我不打算使用的不必要的数据。Spark就是这么做的!对于一侧的联接,Spark将只读取name
列。此行:+- *Scan json [name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string>
然而,对于另一侧df1
,我们希望在联接后的结果中包含所有四列。Spark再次计算出了这一点,并读取了所有四列。此行:+- *Scan json [age#96,city#97,id#98,name#99] Format: JSON, PushedFilters: [IsNotNull(name)], ReadSchema: struct<age:string,city:string,id:string,name:string>
同样,在阅读并加入Spark之前,您发现自己正在加入
name
专栏。所以在加入之前,它删除了名为null
的元组。此行:+- *Filter isnotnull(name#99)
。
这意味着Spark已经在为您做所有这些繁重的工作,以便读取最小的数据并将其带入内存(从而减少洗牌和计算时间(。
然而,对于您的特定情况,您可能需要考虑是否可以进一步减少读取的数据——至少对于联接的一侧。如果df2中有许多行具有与df1匹配的相同键组合,该怎么办。你会不会先在df2上做一个不同的处理会更好?例如:
df1.join(
df2.select("df2.name").distinct,
$"df2.name"===$"df1.name"
).select("df1.id","df1.name", "df1.age", "df1.city" )
这种顺序的数据集的查询性能无法预测,但可以处理。我使用了一个包含7亿条记录的数据集,下面是帮助调整我的应用程序的突出属性。
- spark.sql.shuffle.departitions(自己寻找最佳位置(
- spark.serializer(最好是KryoSerializer(
此外,为应用程序分配集群资源也非常重要。请参阅此博客。谢谢
您是否尝试过根据集群配置将executor核心增加到4个或更多,并且在执行spark提交时做得更好,更不用说执行器的数量了。让spark决定要使用的执行器的数量。在处理庞大的数据集时,这可能会在一定程度上提高性能。