我启用了将日志保存到S3的检查点。如果检查点目录中没有文件,则Spark流效果很好,我可以看到检查点目录中出现的日志文件。然后我杀死火花流并重新启动。这次,我开始获得Spark Session的NullPoInterException。简而言之,如果检查点目录中没有日志文件,Spark流效果很好。但是,一旦我在检查点目录中使用日志文件重新启动Spark流,我就开始在Spark Session上获得Null指针异常。以下是代码:
object asf {
val microBatchInterval = 5
val sparkSession = SparkSession
.builder()
.appName("Streaming")
.getOrCreate()
val conf = new SparkConf(true)
//conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val sparkContext = SparkContext.getOrCreate(conf)
val checkpointDirectory = "s3a://bucketname/streaming-checkpoint"
println("Spark session: " + sparkSession)
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createStreamingContext(sparkContext, microBatchInterval, checkpointDirectory, sparkSession)
}, s3Config.getConfig())
ssc.start()
ssc.awaitTermination()
}
def createStreamingContext(sparkContext: SparkContext, microBatchInterval: Int, checkpointDirectory: String,spark:SparkSession): StreamingContext = {
println("Spark session inside: " + spark)
val ssc: org.apache.spark.streaming.StreamingContext = new StreamingContext(sparkContext, Seconds(microBatchInterval))
//TODO: StorageLevel.MEMORY_AND_DISK_SER
val lines = ssc.receiverStream(new EventHubClient(StorageLevel.MEMORY_AND_DISK_SER);
lines.foreachRDD {
rdd => {
val df = spark.read.json(rdd)
df.show()
}
}
ssc.checkpoint(checkpointDirectory)
ssc
}
}
又是我第一次运行此代码(在检查点目录中没有日志文件)时,我可以看到正在打印的数据框架。如果我在检查点目录中使用日志文件运行,我什至看不到
println("Spark session inside: " + spark)
打印并首次打印。错误:
Exception in thread "main" java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:111)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:549)
at org.apache.spark.sql.SparkSession.read(SparkSession.scala:605)
,错误正在发生:
val df = spark.read.json(rdd)
编辑:我添加了此行:
conf.set("spark.streaming.stopGracefullyOnShutdown","true")
,它仍然没有区别,仍然获得NullpoInterException。
要回答我自己的问题,这有效:
lines.foreachRDD {
rdd => {
val sqlContext:SQLContext = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate().sqlContext
val df = sqlContext.read.json(rdd)
df.show()
}
}
通过RDD.SparkContext Works Works
只是为了新手的利益而明确地说,这是一个反图案。不允许在转换中创建数据集!
正如米歇尔(Michel