我在项目中使用Spark Streaming(Spark V1.6.0(和HBase,HBase(HBase V1.1.2(配置在具有广播变量的执行器之间传输。Spark 流应用程序首先工作,而大约 2 天后,会出现异常。
val hBaseContext: HBaseContext = new HBaseContext(sc, HBaseCock.hBaseConfiguration())
private def _materialDStream(dStream: DStream[(String, Int)], columnName: String, batchSize: Int) = {
hBaseContext.streamBulkIncrement[(String, Int)](
dStream,
hTableName,
(t) => {
val rowKey = t._1
val incVal = t._2
val increment = new Increment(Bytes.toBytes(rowKey))
increment.addColumn(Bytes.toBytes(hFamily), Bytes.toBytes(columnName), incVal)
increment
}, batchSize)
}
HBaseContext的整个源文件可以在下面找到HBaseContext.scala,一些片段可以在下面找到。
运行数天后,将出现异常,堆栈跟踪为:
Unable to getConfig from broadcast
16/02/01 10:08:10 ERROR Executor: Exception in task 3.0 in stage 187175.0 (TID 561527)
逻辑如下:
- 使用 config 创建 HBaseContext (
- HBaseContext( 并广播配置(如果指定了文件路径,则将配置保存到文件(
- 在连接 HBase 之前,首先它会检查字段配置是否为空,如果是,则从指定的文件还原它,或者如果没有指定文件路径,则从广播变量恢复它。
从广播变量恢复配置时出现问题,并且在"configBroadcast.value.value"中从广播读取值时发生异常。
我猜如果 Spark Streaming 不会在 master 失败时恢复广播变量,而 getOrCreate(( 用于获取 SparkStreaming 实例。我更好奇的是,在HBaseContext.scala源代码中,该文件优先于广播变量以恢复值。那么,在 Spark 流中使用广播的最佳做法是什么?我是否需要将它们存储在文件中,比如HDFS中的文件?
class HBaseContext(@transient sc: SparkContext, @transient config: Configuration, val tmpHdfsConfgFile: String = null) extends Serializable{
@transient var tmpHdfsConfiguration: Configuration = config
val broadcastedConf = sc.broadcast(new SerializableWritable(config))
if(tmpHdfsConfgFile != null && config != null){
// save config to file
}
private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): Configuration = {
if (tmpHdfsConfiguration != null) {
tmpHdfsConfiguration
} else if (tmpHdfsConfgFile != null) {
// read config from file
tmpHdfsConfiguration
}
if (tmpHdfsConfiguration == null) {
try {
// Exception happens here!!!
tmpHdfsConfiguration = configBroadcast.value.value
tmpHdfsConfiguration
} catch {
case ex: Exception => {
println("Unable to getConfig from broadcast")
}
}
}
tmpHdfsConfiguration
}
}
由于某种原因重新启动 Spark 作业后,广播变量将重置。或者驱动程序与作业失败后的尝试重新关联。
在流式处理作业的情况下,要使用广播变量,应该在创建 StreamingContext 之前从 sprarkContext 初始化广播。这将确保广播变量在流式传输开始时可用。
JavaSparkContext javaSparkContext = createSparkContext();
Broadcast<BroadcastContext> broadcastContext = getBroadcastContext(javaSparkContext);
JavaStreamingContext javaStreamingContext = JavaStreamingContext.getOrCreate(sparkCheckPointDir,
() -> processor.create(sparkCheckPointDir, javaSparkContext));