Spark Broadcast变量导致在重新启动后从检查点目录重新加载状态时失败



我有一个从Kafka读取数据的spark流应用程序(使用spark 1.6.1(。我使用hdfs上的spark检查点目录来从故障中恢复。

代码在每个批次开始时使用带有映射的广播变量,如下

public static void execute(JavaPairDStream<String, MyEvent> events) {
final Broadcast<Map<String, String>> appConfig =  MyStreamConsumer.ApplicationConfig.getInstance(new    JavaSparkContext(events.context().sparkContext()));

当我在运行时提交作业时,一切都很好,kafka的所有事件都得到了正确处理。问题出现在从故障中恢复时(通过重新启动运行spark的机器进行测试(-spark流应用程序实际上正确启动,并且在运行作业的UI中一切看起来都很好,但一旦通过以下异常发送数据,调用此方法时就会遇到异常(作业崩溃(:

appConfig.value() (the broadcast variable from the start!)

由于Spark作业错误而失败

Caused by: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to java.util.Map

如果我在spark UI中杀死驱动程序,然后从命令行重新提交作业,一切都会恢复正常。但我们的产品要求它可以从故障中自动恢复,甚至只是重新启动任何集群节点,所以我必须解决以上问题。这个问题肯定与Broadcast变量的使用以及重新启动后从spark检查点目录加载状态有关

还要注意的是,我确实正确地创建了广播实例(惰性地/单例(:

public static Broadcast<Map<String, String>> getInstance(JavaSparkContext sparkContext) {
if (instance == null) {

我确实意识到这个问题似乎与:是否可以从Spark流检查点恢复广播值

但我无法按照的指示解决问题

回答我自己的问题,因为其他人可能会觉得它很有用:奇怪的是,如果我将以下代码移到execute方法的开头:appConfig.value((并将其分配给那里的一个普通映射变量,那么以下代码似乎已经解决了这个问题。

然后,如果我只是在我的匿名FlatMapFunction代码中使用use this map变量,而不是appConfig.value((,那么即使在重新启动后,一切都很好。

再说一遍,不确定为什么这有效,但它确实。。。

最新更新