Spark 创建新的 Spark 会话/上下文并从失败中恢复



我工作的Spark平台不稳定,每次都以各种原因使我的工作失败。这项工作并没有死在Hadoop管理器上,而是作为Running徘徊,所以我想杀死它。

在同一个 python 脚本中,我想在出现故障后杀死当前的 spark 会话,创建另一个 sparkcontext/会话并从最后一个检查点开始。我确实有频繁的检查点,以避免DAG变得太长。它容易失败的部分是一个while循环,所以我可以负担得起当前的df

知道我如何实现这一目标吗?

我最初的想法是

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("test_Terminal").config("spark.sql.broadcastTimeout", "36000").getOrCreate()
flag_finish = False
flag_fail=False
while (!flag_finish) :
if flag_fail : #kill current erroneous session 
sc.stop()
conf = pyspark.SparkConf().setAll([('spark.executor.memory', '60g'), 
('spark.driver.memory','30g'),('spark.executor.cores', '16'), 
('spark.driver.cores', '24'),('spark.cores.max', '32')])
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
df = ...#read back from checkpoint or disk
#process with current df or df picked up
while .. :#this is where server tend to fail my job due after some time
try :
##df processing and update
...
df.checkpoint()
df.count() #activate checkpoint 
if complete :
flag_finished = True
exception Exception as e:
flag_fail=True
continue

另一个问题是如何从检查点显式读取(这已经由df.checkpoint((完成(

非流式处理中的检查点是使用服务器世系。它不是为在不同应用程序或不同 Spark 上下文之间共享数据而设计的。

事实上,你想要的是不可能的。

最新更新