连接的Spark性能分析



输入数据

我有两个表从MySQL导出为csv文件。

表1磁盘大小:250 MB记录:70万

表2的磁盘大小:350 MB记录:60万

更新代码

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val table-one = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("example-input-files/table-1-data.csv”)
table-one.registerTempTable(“table-one”)
val table-two = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("example-input-files/table-2-data.csv”)
table-two.registerTempTable(“table”-two)
sqlContext.cacheTable(“table-one”)
sqlContext.cacheTable(“table-two”)
val result = sqlContext.sql("SELECT table-one.ID,table-two.ID FROM table-one LEFT JOIN table-two ON table-one.ID = table-two.ID")
result.take(2).foreach(println)

Spark Job

  • 使用Databricks csv读取两个csv文件Lib并将其注册为表。

  • 使用公共列(典型的左列)对两者执行左连接

  • 打印前两个结果,因为在控制台上打印本身会消耗时间。

整个过程耗时30秒。我在一台有足够内存的机器上运行,这样两个文件都可以放进去(毕竟是600Mb)。

我有两种方法来做这项工作。

  • 作为一个整体运行作业,即加载所有csv,运行join,然后打印结果
  • 第二种方式,我首先使用sqlContext.cacheTable("the_table") 在内存中运行和缓存表

缓存后,我发现连接操作本身需要8秒才能完成。

这个时间合理吗?我猜它不是,有很多优化可以加快查询。

我看到的优化

  • 将数据放入HDFS而不是本地磁盘。这会加快检索速度吗?
  • 在集群上运行,我猜这不会很快,因为数据可以放入内存,顺序将更快。
  • 建模数据和使用cassandra会更快吗?
  • 我使用普通SQL连接,RDD连接会更快吗?

还有什么办法可以把事情做得更好吗?

正如评论者提到的,Spark是为分布式计算而设计的。在本地处理小数据时,仅所有初始化和调度的开销就足以使Spark看起来比其他PL慢。

在集群上运行,我猜这不会很快,因为数据可以存储在内存中,顺序操作会更快。

只要你的代码执行狭窄的转换,执行器实际上就会在内存中的数据本地副本上工作,所以这并不完全正确。但是,您的代码执行连接,这是一个广泛的转换—这意味着必须在整个网络中对块进行洗牌。记住这一点。广泛的转换是昂贵的,所以尽可能把它们放在DAG的末尾。但是,您的数据足够小,您可能看不到好处。

另一件事是,如果你有Hive,那么你可以考虑将数据存储在你的join列分区的表中。

相关内容

  • 没有找到相关文章

最新更新