Spark Streaming检查指向引发Not Serializable异常



我们使用的是基于Spark Streaming Receiver的方法,我们刚刚启用了Check pointing来消除数据丢失问题。

Spark版本是1.6.1,我们收到了来自Kafka主题的消息。

我在里面使用sscDStreamforeachRDD方法,所以它抛出了Not Serializable异常。

我尝试扩展类Serializable,但仍然出现相同的错误。只有当我们启用检查点时,才会发生这种情况。

代码为:

def main(args: Array[String]): Unit = {
val checkPointLocation = "/path/to/wal"
val ssc = StreamingContext.getOrCreate(checkPointLocation, () => createContext(checkPointLocation))
ssc.start()
ssc.awaitTermination()
}
def createContext (checkPointLocation: String): StreamingContext ={
val sparkConf = new SparkConf().setAppName("Test")
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, Seconds(40))
ssc.checkpoint(checkPointLocation)
val sc = ssc.sparkContext
val sqlContext: SQLContext = new HiveContext(sc)
val kafkaParams = Map("group.id" -> groupId,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> sasl,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
"metadata.broker.list" -> brokerList,
"zookeeper.connect" -> zookeeperURL)
val dStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
dStream.foreachRDD(rdd =>
{
// using sparkContext / sqlContext to do any operation throws error.
// convert RDD[String] to RDD[Row]
//Create Schema for the RDD.
sqlContext.createDataFrame(rdd, schema)
})
ssc
}

错误日志:

2017-02-08 22:53:53250错误〔驱动程序〕流。流式处理上下文:启动上下文时出错,将其标记为已停止java.io.NotSerializableException:DStream检查点已被已启用,但DStream及其函数不可序列化org.apache.spark.SparkContext序列化堆栈:-对象不可序列化(类:org.apache.spark.SparkContext,值:org.apache.spark.SparkContext@1c5e3677)-字段(类:com.x.payments.RemedyDriver$$anonfun$main$1,名称:sc$1,类型:class.org.apache.spark.SparkContext)-对象(类com.x.payments.RemedyDriver$$anonfun$main$1,)-字段(类:org.apache.spark.streaming.dstream.dstream$$anonfun$foreachRDD$1$$anonfon$apply$mcV$sp$3,名称:cleanedF$1,类型:接口scala。功能1)-对象(类org.apache.spark.streaming.dstream.dstream$$anonfun$foreachRDD$1$$anonfon$apply$mcV$sp$3,)-writeObject数据(类:org.apache.spark.streaming.dstream.dstream)-对象(类org.apache.spark.streaming.dstream.For EachDStream,org.apache.spark.streaming.dstream.ForEachDStream@68866c5)-数组的元素(索引:0)-数组(类[Ljava.lang.Object;,大小16)-字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类[Ljava.lang.Object;)-对象(类scala.collection.mutable.ArrayBuffer,ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@68866c5))-writeObject数据(类:org.apache.spark.streaming.dstream.DStreamCheckpointData)-对象(类org.apache.spark.streaming.dstream.StreamCheckpointData,[0]检查点文件

])-writeObject数据(类:org.apache.spark.streaming.dstream.dstream)-对象(类org.apache.spark.streaming.kafka.KafkaInputDStream,org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32)-数组的元素(索引:0)-数组(类[Ljava.lang.Object;,大小16)-字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类[Ljava.lang.Object;)-对象(类scala.collection.mutable.ArrayBuffer,ArrayBuffer(org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32))-writeObject数据(类:org.apache.spark.streaming.DStreamGraph)-对象(类org.apache.spark.streaming.DStreamGraph,org.apache.spark.streaming.DStreamGraph@6935641e)-字段(类:org.apache.spark.streaming.Checkpoint,名称:graph,类型:类org.apache.sspark.streaming.DStreamGraph)-对象(类org.apache.spark.streaming.Checkpoint,org.apache.spark.streaming.Checkpoint@484bf033)网址:org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:557)网址:org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)网址:org.apache.spark.streaming.StreamingContext.start(StreamingContext·斯cala:600)网址:com.x.payments.RemedyDriver$.main(RemedyDriver.scala:104)网址:com.x.payments.RemedyDriver.main(RemedyDriver.scala)在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)位于sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)位于java.lang.reflect.Method.ioke(Method.java:498)网址:org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:559)2017-02-08 22:53:53250错误〔司机〕付款。RemedyDriver$:DStream检查点已启用,但DStream及其功能不可序列化org.apache.spark.SparkContext序列化堆栈:-对象不可序列化(类:org.apache.spark.SparkContext,值:org.apache.spark.SparkContext@1c5e3677)-字段(类:com.x.payments.RemedyDriver$$anonfun$main$1,名称:sc$1,类型:class.org.apache.spark.SparkContext)-对象(类com.x.payments.RemedyDriver$$anonfun$main$1,)-字段(类:org.apache.spark.streaming.dstream.dstream$$anonfun$foreachRDD$1$$anonfon$apply$mcV$sp$3,名称:cleanedF$1,类型:接口scala。功能1)-对象(类org.apache.spark.streaming.dstream.dstream$$anonfun$foreachRDD$1$$anonfon$apply$mcV$sp$3,)-writeObject数据(类:org.apache.spark.streaming.dstream.dstream)-对象(类org.apache.spark.streaming.dstream.For EachDStream,org.apache.spark.streaming.dstream.ForEachDStream@68866c5)-数组的元素(索引:0)-数组(类[Ljava.lang.Object;,大小16)-字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类[Ljava.lang.Object;)-对象(类scala.collection.mutable.ArrayBuffer,ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@68866c5))-writeObject数据(类:org.apache.spark.streaming.dstream.DStreamCheckpointData)-对象(类org.apache.spark.streaming.dstream.StreamCheckpointData,[0]检查点文件

])-writeObject数据(类:org.apache.spark.streaming.dstream.dstream)-对象(类org.apache.spark.streaming.kafka.KafkaInputDStream,org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32)-数组的元素(索引:0)-数组(类[Ljava.lang.Object;,大小16)-字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类[Ljava.lang.Object;)-对象(类scala.collection.mutable.ArrayBuffer,ArrayBuffer(org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32))-writeObject数据(类:org.apache.spark.streaming.DStreamGraph)-对象(类org.apache.spark.streaming.DStreamGraph,org.apache.spark.streaming.DStreamGraph@6935641e)-字段(类:org.apache.spark.streaming.Checkpoint,名称:graph,类型:类org.apache.sspark.streaming.DStreamGraph)-对象(类org.apache.spark.streaming.Checkpoint,org.apache.spark.streaming.Checkpoint@484bf033)2017-02-0822:53:53255 INFO[Driver]纱线。ApplicationMaster:最终应用程序状态:成功,现有代码:0

更新:

基本上,我们要做的是将rdd转换为DF(在DStream的foreach rdd方法内部),然后在此基础上应用DF API,最后将数据存储在Cassandra中。所以我们使用sqlContext将rdd转换为DF,这一次它会抛出错误。

如果您想访问SparkContext,请通过rdd值进行访问:

dStream.foreachRDD(rdd => {
val sqlContext = new HiveContext(rdd.context)
val dataFrameSchema = sqlContext.createDataFrame(rdd, schema)
}

此:

dStream.foreachRDD(rdd => {
// using sparkContext / sqlContext to do any operation throws error.
val numRDD = sc.parallelize(1 to 10, 2)
log.info("NUM RDD COUNT:"+numRDD.count())
}

导致SparkContext在闭包中序列化,但由于它不可序列化,因此无法序列化。

相关内容

最新更新