我的设置是:在3个节点纱线簇上的Spark 2.1,带有160 GB,48个VCORS。动态分配已打开。 spark.executor.memory=6G
,spark.executor.cores=6
首先,我正在阅读蜂巢表:订单(329MB)和lineItems(1.43GB)和做一个左外连接。接下来,我根据连接的7种不同的过滤条件数据集(例如var line1 = joinedDf.filter("linenumber=1")
,var line2 = joinedDf.filter("l_linenumber=2")
等)。因为我要多次在加入数据集上过滤,所以我认为执行持久性(MEMORY_ONLY
)会在这里有所帮助,因为加入数据集将完全适合内存。
-
我注意到,持久性,火花应用程序要比没有持久性的时间更长(3.5分钟vs 3.3分钟)。随着持久性,DAG表明,为持久性创建了一个单个阶段,其他下游作业正在等待持久性完成。这是否意味着坚持是一个阻止电话?或其他工作中的阶段是否可以在持续的块可用时开始处理?
-
在非专业案例中,不同的作业正在创建不同的阶段以读取相同的数据。数据在不同阶段被多次读取,但事实证明这仍然比持续情况更快。
-
使用较大的数据集,持久实际上会导致执行者用完内存(Java堆空间)。没有坚持不懈,Spark Jobs就可以了。我在这里查看了其他一些建议:Spark
java.lang.OutOfMemoryError: Java heap space
。我尝试增加/减少执行者核心,持续存在仅使用磁盘,增加了分区,修改存储率,但似乎没有任何帮助解决执行器内存问题。
如果有人能提及持久性的工作方式,我将不胜感激,在什么情况下,它比不持有的更快,更重要的是,如何解决记忆问题。
我建议阅读有关火花中转换和动作之间的区别。我必须承认,我本人多次被这个。
Spark中的数据懒洋洋地评估了,这实际上意味着直到"行动"之前什么都没有发生。执行。.filter()
函数是一个转换,因此当您的代码达到该点时,实际上什么也不会发生,除了将部分添加到转换管道中。呼叫.persist()
的行为方式相同。
如果.persist()
调用的下游您的代码具有多个可以同时触发的操作,那么您实际上很有可能是"持久"每个动作的数据分别吞噬记忆(Spark UI中的"存储"选项卡会告诉您数据集的%缓存,如果它的缓存超过100%,那么您在这里看到了我在这里描述的内容)。更糟糕的是,您可能永远不会实际使用缓存的数据。
通常,如果您在代码中有一个点,数据集将分叉分为两个单独的转换管道(示例中的每个单独的.filter()
S),则.persist()
是一个好主意,可以防止对数据源进行多次读数,并且/或保存货叉之前昂贵的转换管道的结果。
多次,最好在.persist()
调用后(在数据叉之前)触发单个动作,以确保以后的操作(可能同时运行)从持久的缓存中读取,而不是评估(并且无用的缓存)数据独立。
tl; dr:
在您的.persist()
之后进行joinedDF.count()
,但是在您的.filter()
s。