我发现,随着火花的运行,桌子的大小(通过连接)生长,即火花执行者最终将耗尽内存,整个系统崩溃。即使我尝试将临时结果写入Hive Tables(在HDF上),该系统仍然没有太多的内存,并且我的整个系统在加入约130后崩溃。
但是,通过实验,我意识到,如果我将问题分解为较小的零件,将临时结果写入蜂巢表,然后停止/启动Spark Session(和Spark上下文),那么系统的资源就会释放。我能够使用这种方法加入1,000多列。
但是我找不到任何文档来理解这是否被认为是一个好习惯(我知道您不应该一次获得多个会议)。大多数系统在开始时都会获取会话,最后将其关闭。我还可以将应用程序分解为较小的应用程序,并使用像Oozie这样的驱动程序在纱线上安排这些较小的应用程序。但是,这种方法将在每个阶段开始并停止JVM,这似乎有些重量。
所以我的问题是:在单个Spark应用程序运行期间,不断启动/停止Spark Session以免费的系统资源是不好的做法吗?
但是,您能详细说明单个JVM上的单个SparkContext的含义吗?我能够致电sparkSession.sparkContext().stop()
,也可以将stop
称为SparkSession
。然后,我创建了一个新的SparkSession
并使用了新的sparkContext
。没有错误。
我也能够在JavaSparkPi
上使用它而没有任何问题。
我已经在yarn-client
和local
Spark Install中进行了测试。
停止火花上下文的究竟是什么,为什么停止了一个新的?
tl; dr 您可以拥有尽可能多的 SparkSession
s。
您可以在单个JVM上拥有一个和只有一个SparkContext
,但是SparkSession
S的数量几乎是无限的。
但是您能详细说明单个JVM上的单个SparkContext的含义吗?
这意味着在Spark应用程序的生命周期中的任何给定时间,驾驶员只能是一个,只有一个,这又意味着该JVM上只有一个SparkContext
。
Spark应用程序的驱动程序是SparkContext
所处的位置(或者是SparkContext
定义驱动程序的位置 - 区别几乎是模糊的)。
您一次只能拥有一个SparkContext
。尽管您可以尽可能多地按需开始并停止它,但是我记得一个问题,除非您已经完成了Spark,否则您不应该关闭SparkContext
(通常发生在Spark应用程序的末尾)。
换句话说,在Spark应用程序的整个生命周期中都有一个SparkContext
。
有一个类似的问题sparksession.sql vs dataset.sqlcontext.sql有什么区别?大约多个SparkSession
s可以更多地了解为什么要进行两个或多个会话。
我能够致电
sparkSession.sparkContext().stop()
,也可以致电stop
SparkSession
。
所以?!这与我说的是如何矛盾的?您停止了JVM上唯一可用的SparkContext
。没什么大不了的。您可以,但这只是"您只能在一个可用的JVM上只有一个和一个SparkContext
"的一部分,不是吗?
SparkSession
是SparkContext
周围的包装器
从Spark SQL开发人员的点起,SparkSession
的目的是成为查询实体的名称空间,例如查询使用的表,视图或功能(如数据帧,数据集或SQL)和Spark属性(可能具有不同的属性每个SparkSession
)。
如果您想拥有用于不同数据集的相同(临时)表名称,则创建两个SparkSession
S将是我认为推荐的方式。
我刚刚在一个示例上展示了整个阶段代码gen在Spark SQL中的工作方式,并创建了以下内容,以关闭该功能。
// both where and select operators support whole-stage codegen
// the plan tree (with the operators and expressions) meets the requirements
// That's why the plan has WholeStageCodegenExec inserted
// You can see stars (*) in the output of explain
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)
scala> q.explain
== Physical Plan ==
*Project [_2#89 AS c0#93]
+- *Filter (_1#88 = 0)
+- LocalTableScan [_1#88, _2#89, _3#90]
// Let's break the requirement of having up to spark.sql.codegen.maxFields
// I'm creating a brand new SparkSession with one property changed
val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_MAX_NUM_FIELDS
newSpark.sessionState.conf.setConf(WHOLESTAGE_MAX_NUM_FIELDS, 2)
scala> println(newSpark.sessionState.conf.wholeStageMaxNumFields)
2
// Let's see what's the initial value is
// Note that I use spark value (not newSpark)
scala> println(spark.sessionState.conf.wholeStageMaxNumFields)
100
import newSpark.implicits._
// the same query as above but created in SparkSession with WHOLESTAGE_MAX_NUM_FIELDS as 2
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)
// Note that there are no stars in the output of explain
// No WholeStageCodegenExec operator in the plan => whole-stage codegen disabled
scala> q.explain
== Physical Plan ==
Project [_2#122 AS c0#126]
+- Filter (_1#121 = 0)
+- LocalTableScan [_1#121, _2#122, _3#123]
然后,我创建了一个新的SparkSession
并使用了新的SparkContext
。没有错误。
再次,这与我所说的有关单个SparkContext
可用的说法是如何矛盾的?我很好奇。
停止火花上下文的究竟是什么?为什么停止了一个新的?
您无法再使用它来运行Spark作业(处理大型和分布式数据集),这几乎完全是您首先使用Spark的原因,不是吗?
尝试以下内容:
- 停止
SparkContext
- 使用Spark Core的RDD或Spark SQL的数据集API 执行任何处理
例外?正确的!还记得您关闭"门"以激发火花,所以您怎么可以期望在里面?!:)