我试图在火花流应用程序中连接DB2数据库,并且数据库查询执行语句引起的org.apache.spark.spark.sparkexception:任务不是可序列化的"问题"问题。请指教。以下是我有参考的示例代码。
dataLines.foreachRDD{rdd=>
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
val dataRows=rdd.map(rs => rs.value).map(row =>
row.split(",")(1)-> (row.split(",")(0), row.split(",")(1), row.split(",")(2)
, "cvflds_"+row.split(",")(3).toLowerCase, row.split(",")(4), row.split(",")(5), row.split(",")(6))
)
val db2Conn = getDB2Connection(spark,db2ConParams)
dataRows.foreach{ case (k,v) =>
val table = v._4
val dbQuery = s"(SELECT * FROM $table ) tblResult"
val df=getTableData(db2Conn,dbQuery)
df.show(2)
}
}
Below is other function code:
private def getDB2Connection(spark: SparkSession, db2ConParams:scala.collection.immutable.Map[String,String]): DataFrameReader = {
spark.read.format("jdbc").options(db2ConParams)
}
private def getTableData(db2Con: DataFrameReader,tableName: String):DataFrame ={
db2Con.option("dbtable",tableName).load()
}
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
以下是错误日志:
2018-03-28 22:12:21,487 [jobscheduler]错误org.apache.spark.spark.streaming.scheduler.jobscheduler-错误运行作业流媒体作业15222289540000 MS.0 MS.0org.apache.spark.sparkexception:任务不可序列化 at org.apache.spark.util.closurecleaner $。验证(ClosureCleaner.Scala:298( 请访问org.apache.spark.util.closurecleaner $ .org $ .org $ apache $ spark $ util $ clocrecleaner $$ clean(closurecleaner.scala:288( atorg.apache.spark.util.closurecleaner $ .clean(clocrecleaner.scala:108( atrg.apache.spark.sparkcontext.clean(SparkContext.Scala:2094( atorg.apache.spark.rdd.rdd $$ anonfun $ for for $ 1.apply(rdd.scala:916( at org.apache.spark.rdd.rdd $$ anonfun $ for $ 1.apply(rdd.scala:915( atrg.apache.spark.rdd.rddoperationscope $ .withScope(rddoperationscope.scala:151( atrg.apache.spark.rdd.rddoperationscope $ .withScope(rddoperationscope.scala:112( at org.apache.spark.rdd.rdd.withscope(rdd.scala:362( 在org.apache.spark.rdd.rdd.foreach(rdd.scala:915( 在ncc.org.civil.receiver.db2dataloadtokudu $$ anonfun $ createsparkContext $ 1.apply(db2dataloadtokudu.scala:139(( 在ncc.org.civil.receiver.db2dataloadtokudu $$ anonfun $ createsparkContext $ 1.apply(db2dataloadtokudu.scala:128( at org.apache.spark.streaming.dstream.dstream $$ anonfun $ foreachrdd $ 1 $$ anonfun $ apply $ mcv $ sp $ 3.Apply(dstream.scala:627( at org.apache.spark.streaming.dstream.dstream $$ anonfun $ foreachrdd $ 1 $$ anonfun $ apply $ mcv $ sp $ 3.Apply(dstream.scala:627( 请访问org.apache.spark.streaming.dstream.foreachdstream $$ anonfun $ 1 $$ anonfun $ apply $ mcv $ sp $ 1.Apply $ mcv $ sp(foreachdstream.scala.scala:51( atorg.apache.spark.streaming.dstream.foreachdstream $$ anonfun $ 1 $$ anonfun $ apply $ mcv $ sp $ 1.apply(foreachdstream.scala:51( atorg.apache.spark.streaming.dstream.foreachdstream $$ anonfun $ 1 $$ anonfun $ apply $ mcv $ sp $ 1.apply(foreachdstream.scala:51( 请访问org.apache.spark.streaming.dstream.dstream.createrddwithlocalproperties(dstream.scala:415( atorg.apache.spark.streaming.dstream.foreachdstream $$ anonfun $ 1.Apply $ mcv $ sp(foreachdstream.scala:50( atorg.apache.spark.streaming.dstream.foreachdstream $$ anonfun $ 1.Apply(foreachdstream.scala:50( atorg.apache.spark.streaming.dstream.foreachdstream $$ anonfun $ 1.Apply(foreachdstream.scala:50( 在scala.util.try $ .apply(try.scala:192( atorg.apache.spark.streaming.scheduler.job.run(job.scala:39( atorg.apache.spark.streaming.scheduler.jobscheduler $ jobhandler $$ anonfun $ run $ 1.Apply $ mcv $ sp(jobscheduler.scala:254( 请访问org.apache.spark.streaming.scheduler.jobscheduler $ jobhandler $$ anonfun $ run $ 1.Apply(Jobscheduler.scala:254( 请访问org.apache.spark.streaming.scheduler.jobscheduler $ jobhandler $$ anonfun $ run $ 1.Apply(Jobscheduler.scala:254( 在scala.util.dynamicvariable.withvalue(dynamicvariable.scala:58( 请访问org.apache.spark.streaming.scheduler.jobscheduler $ jobhandler.run(jobscheduler.scala:253( at Java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145( at Java.util.concurrent.threadpoolexecutor $ worker.run(threadpoolexecutor.java:615( 在java.lang.thread.run(thread.java:745(引起:序列化堆栈: - 对象不是序列化(类:org.apache.spark.sql.dataframereader,value:org.apache.spark.sql.dataframereader@15fdb01( -FIELD(班级:ncc.org.civil.receiver.db2dataloadtokudu $$ $$ anonfun $ createsparkContext $ 1 $ 1 $$ anonfun $ apply $ 2,name:db2conn $ 1,类型:class org.apache.pache.spark.spark.spark.spark.sql.dataframereader( - 对象(ncc.org.civil.civil.receiver.db2dataloadtokudu $$ anonfun $ createsparkContext $ 1 $$ anonfun $ apply $ 2,( atorg.apache.spark.serializer.SerializationDebugger $ .Improveexception(SerializationDebugger.Scala:40( at rog.apache.spark.serializer.javaserializationstream.writeObject(javaserializer.scala:46( atrg.apache.spark.serializer.javaserializerInstance.Serialize(Javaserializer.Scala:100( at rog.apache.spark.util.closurecleaner $。验证(ClosureCleaner.Scala:295( ... 30多
理想情况下,您应该在dataRows.foreach
中保留任何连接对象,因为闭合旨在将其序列化为执行者并在此处运行。这个概念被深入介绍 @此官方链接
在您的情况下,行是导致问题的关闭:
val df=getTableData(db2Conn,dbQuery)
所以,而不是使用火花来加载DB2表,在您的情况下,它会变成(结合方法之后(:
spark.read.format("jdbc").options(db2ConParams).option("dbtable",tableName).load()
在关闭中使用普通的JDBC来实现这一目标。您可以在JDBC代码中使用db2ConParams
。(我认为它足够简单,可以序列化(。该链接还建议使用rdd.foreachPartition
和ConnectionPool
进一步优化。
您尚未提及除df.show(2)
以外的表数据所做的事情。如果行很大,那么您可以讨论有关用例的更多信息。也许,您需要考虑不同的设计。