在 Spark 中重用联接的数据帧



我在本地运行HDFS和Spark,并试图了解Spark持久性的工作原理。我的目标是将联接的数据集存储在内存中,然后动态运行查询。但是,我的查询似乎是在重做联接,而不是简单地扫描持久化的预联接数据集。

我已经通过从 HDFS 加载两个 CSV 文件来创建并持久化了两个数据帧,比如 df1 和 df2。我将两个数据帧的连接保留在内存中:

val result = df1.join(df2, "USERNAME")
result.persist()
result.count()

然后,我在结果之上定义一些操作:

val result2 = result.select("FOO", "BAR").groupBy("FOO").sum("BAR")
result2.show()

"result2"不会搭载持久化的结果,而是自行重做联接。以下是结果和结果2的实际计划:

== Physical Plan for result ==
InMemoryColumnarTableScan [...], (InMemoryRelation [...], true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject [...]), None)

== Physical Plan for result2 ==
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Final,isDistinct=false)], output=[FOO#2,sum(BAR)#837])
 TungstenExchange hashpartitioning(FOO#2)
  TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Partial,isDistinct=false)], output=[FOO#2,currentSum#1311])
   InMemoryColumnarTableScan [FOO#2,BAR#10], (InMemoryRelation [...], true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject [...]), None)

我会天真地假设,由于连接已经完成并在内存中分区,因此第二个操作将仅包含每个分区上的聚合操作。从头开始重做联接应该更昂贵。我假设不正确还是做错了什么?此外,这是保留联接数据集以供以后查询的正确模式吗?

编辑:作为记录,在我拒绝随机分区的数量后,第二个查询的性能变得更加高效。默认情况下,spark.sql.shuffle.partitions 设置为 200。只需在我的本地实例上将其设置为 1 即可大大提高性能。

如果我们查看计划,我们将看到 Spark 实际上是在利用缓存的数据,而不是重做联接。自下而上:

这是 Spark 从缓存中读取数据:

InMemoryColumnarTableScan [FOO#2,BAR#10], (InMemoryRelation ...

这是 Spark 在每个分区中按 FOO 聚合 BAR - 查找模式=部分

TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Partial ...

这是 Spark 从上一步的每个分区中洗牌数据:

TungstenExchange hashpartitioning(FOO#2)

这是 Spark 聚合随机分区总和 - 查找模式=最终

TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Final ...

阅读这些计划有点痛苦,所以如果你可以访问Spark UI的SQL选项卡(我认为是1.5+(,我建议改用它。

相关内容

  • 没有找到相关文章

最新更新