我在本地运行HDFS和Spark,并试图了解Spark持久性的工作原理。我的目标是将联接的数据集存储在内存中,然后动态运行查询。但是,我的查询似乎是在重做联接,而不是简单地扫描持久化的预联接数据集。
我已经通过从 HDFS 加载两个 CSV 文件来创建并持久化了两个数据帧,比如 df1 和 df2。我将两个数据帧的连接保留在内存中:
val result = df1.join(df2, "USERNAME")
result.persist()
result.count()
然后,我在结果之上定义一些操作:
val result2 = result.select("FOO", "BAR").groupBy("FOO").sum("BAR")
result2.show()
"result2"不会搭载持久化的结果,而是自行重做联接。以下是结果和结果2的实际计划:
== Physical Plan for result ==
InMemoryColumnarTableScan [...], (InMemoryRelation [...], true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject [...]), None)
== Physical Plan for result2 ==
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Final,isDistinct=false)], output=[FOO#2,sum(BAR)#837])
TungstenExchange hashpartitioning(FOO#2)
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Partial,isDistinct=false)], output=[FOO#2,currentSum#1311])
InMemoryColumnarTableScan [FOO#2,BAR#10], (InMemoryRelation [...], true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject [...]), None)
我会天真地假设,由于连接已经完成并在内存中分区,因此第二个操作将仅包含每个分区上的聚合操作。从头开始重做联接应该更昂贵。我假设不正确还是做错了什么?此外,这是保留联接数据集以供以后查询的正确模式吗?
编辑:作为记录,在我拒绝随机分区的数量后,第二个查询的性能变得更加高效。默认情况下,spark.sql.shuffle.partitions 设置为 200。只需在我的本地实例上将其设置为 1 即可大大提高性能。
如果我们查看计划,我们将看到 Spark 实际上是在利用缓存的数据,而不是重做联接。自下而上:
这是 Spark 从缓存中读取数据:
InMemoryColumnarTableScan [FOO#2,BAR#10], (InMemoryRelation ...
这是 Spark 在每个分区中按 FOO 聚合 BAR - 查找模式=部分
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Partial ...
这是 Spark 从上一步的每个分区中洗牌数据:
TungstenExchange hashpartitioning(FOO#2)
这是 Spark 聚合随机分区总和 - 查找模式=最终
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Final ...
阅读这些计划有点痛苦,所以如果你可以访问Spark UI的SQL选项卡(我认为是1.5+(,我建议改用它。