我有两个大型蜂巢表,我想加入spark.sql。假设我们有表1和表2,表1中有500万行,表2上的行2。
我想加入它们,并在某些列上进行一些聚合,以便在用两个条件进行过滤时计数所有行和平均列(例如DoubleColumn)的平均值(在Col1,Col2上说)。
注意:我在单台计算机上的测试安装中工作(虽然很强大)。我希望在集群中表现可能会有所不同。
我的第一次尝试是使用Spark SQL,例如:
val stat = sqlContext.sql("select count(id), avg(doubleColumn) " +
" FROM db.table1 as t1 JOIN db.table2 " +
" ON t1.id = t2.id " +
" WHERE col1 = val1 AND col2 = val2").collect
不幸的是,即使我每个执行人和驱动程序至少给出8 GB的内存,大约5分钟的运行量也很差。我还尝试使用DataFrame语法,并尝试首先过滤行并仅选择特定列以具有更好的选择性,例如:
//Filter first and select only needed column
val df = spark.sql("SELECT * FROM db.tab1")
val tab1= df.filter($"col1" === "val1" && $"col2" === "val2").select("id")
val tab2= spark.sql("SELECT id, doubleColumn FROM db.tab2")
val joined = tab1.as("d1").join(tab2.as("d2"), $"d1.id" === $"d2.id")
//Take the aggregations on the joined df
import org.apache.spark.sql.functions;
joined.agg(
functions.count("id").as("count"),
functions.avg("doubleColumn").as("average")
).show();
但这没有显着的性能增长。如何提高加入的性能?
哪种是执行此Spark.sql或DataFrame语法的最佳方法?
给更多执行者或内存会有所帮助?
我应该使用缓存吗?
我在dataframes tab1,tab2和加入聚合中均具有显着增益,但我认为可以缓存我的数据范围,因为我们对并发感兴趣,许多用户同时询问一些分析性查询。无事可做,因为我在单个节点上工作,当我在集群上进行生产环境时,我的问题会消失?
奖励问题:我尝试了Impala的查询,并且大约40秒钟,但它比Spark.sql好得多。Impala怎么能比火花更好?!
哪种是执行此Spark.sql或DataFrame语法的最佳方法?
没有任何区别。
给更多执行者或内存会有所帮助吗?
仅在数据偏斜引起并且您正确调整配置时才仅引起问题。
我应该使用缓存吗?
如果多次重复使用输入数据,则建议(如您已经确定)性能。
没有什么可做的,因为我在单个节点上工作,当我在集群上进入生产环境时,我的问题会消失?
在单个节点上的一般性能测试中,完全没有用。它错过了瓶颈(网络IO/通信)和优势(摊销磁盘I/O和资源使用情况)。
但是,您可以显着降低相似之处(spark.sql.shuffle.partitions
,sql.default.parallelism
和增加输入拆分尺寸)。与资产相比,专为分发负载而设计的反发明火花风格的并行性是单个机器的责任。与共享记忆相比,这取决于混乱(磁盘写入!),使事情变得非常慢,而安排开销非常重要。
黑斑羚如何比火花更好?!
因为它是专门为低延迟并发查询而设计的。这不是Spark的目标(数据库与ETL框架)。
当你
由于我们对并发感兴趣,许多用户同时询问某些分析查询。
Spark听起来似乎不是一个正确的选择。
您可以更改配置,无论如何您都必须在大型集群上更改它们。我可以立即想到两件事。将spark.executor.cores
设置为5,并根据内存,用spark.executor.instances
和spark.executor.memory
给出更多执行者和更多内存。另外,您可以用某个列来铲斗和对蜂巢表进行排序吗?如果将桌子贴上来,则将消除在加入表之前对表进行分类的需求。
,如果您在加入之后缓存数据框,则可能会更快,这取决于催化剂如何处理聚合查询。查询结束后,您也可以unpersist()
,但我同意GC可能不值得。
您不会使用SQL或Scala DSL看到任何好处。两者都使用全阶段代码生成,因此它们基本相同。
Impala总是更快的一个原因是因为它永远不会担心复制,尽管一个节点不应该烦恼,但是在预读数据之间的spark可能没有优美的分离来复制而不需要复制之间。