火花流嵌套的执行序列化问题



我试图在火花流应用程序中连接DB2数据库,并且数据库查询执行语句引起的org.apache.spark.spark.sparkexception:任务不是可序列化的"问题"问题。请指教。以下是我有参考的示例代码。

        dataLines.foreachRDD{rdd=>
          val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
          val dataRows=rdd.map(rs => rs.value).map(row =>
            row.split(",")(1)-> (row.split(",")(0), row.split(",")(1), row.split(",")(2)
              , "cvflds_"+row.split(",")(3).toLowerCase, row.split(",")(4), row.split(",")(5), row.split(",")(6))
          )
          val db2Conn = getDB2Connection(spark,db2ConParams)
          dataRows.foreach{ case (k,v) =>
              val table = v._4
              val dbQuery = s"(SELECT * FROM $table ) tblResult"
              val df=getTableData(db2Conn,dbQuery)
              df.show(2)
          }
        }

Below is other function code:
  private def getDB2Connection(spark: SparkSession, db2ConParams:scala.collection.immutable.Map[String,String]): DataFrameReader = {
      spark.read.format("jdbc").options(db2ConParams)
  }
  private def getTableData(db2Con: DataFrameReader,tableName: String):DataFrame ={
      db2Con.option("dbtable",tableName).load()
  }

object SparkSessionSingleton {
  @transient  private var instance: SparkSession = _
  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}

以下是错误日志:

2018-03-28 22:12:21,487 [jobscheduler]错误org.apache.spark.spark.streaming.scheduler.jobscheduler-错误运行作业流媒体作业15222289540000 MS.0 MS.0org.apache.spark.sparkexception:任务不可序列化 at org.apache.spark.util.closurecleaner $。验证(ClosureCleaner.Scala:298( 请访问org.apache.spark.util.closurecleaner $ .org $ .org $ apache $ spark $ util $ clocrecleaner $$ clean(closurecleaner.scala:288( atorg.apache.spark.util.closurecleaner $ .clean(clocrecleaner.scala:108( atrg.apache.spark.sparkcontext.clean(SparkContext.Scala:2094( atorg.apache.spark.rdd.rdd $$ anonfun $ for for $ 1.apply(rdd.scala:916( at org.apache.spark.rdd.rdd $$ anonfun $ for $ 1.apply(rdd.scala:915( atrg.apache.spark.rdd.rddoperationscope $ .withScope(rddoperationscope.scala:151( atrg.apache.spark.rdd.rddoperationscope $ .withScope(rddoperationscope.scala:112( at org.apache.spark.rdd.rdd.withscope(rdd.scala:362( 在org.apache.spark.rdd.rdd.foreach(rdd.scala:915( 在ncc.org.civil.receiver.db2dataloadtokudu $$ anonfun $ createsparkContext $ 1.apply(db2dataloadtokudu.scala:139(( 在ncc.org.civil.receiver.db2dataloadtokudu $$ anonfun $ createsparkContext $ 1.apply(db2dataloadtokudu.scala:128( at org.apache.spark.streaming.dstream.dstream $$ anonfun $ foreachrdd $ 1 $$ anonfun $ apply $ mcv $ sp $ 3.Apply(dstream.scala:627( at org.apache.spark.streaming.dstream.dstream $$ anonfun $ foreachrdd $ 1 $$ anonfun $ apply $ mcv $ sp $ 3.Apply(dstream.scala:627( 请访问org.apache.spark.streaming.dstream.foreachdstream $$ anonfun $ 1 $$ anonfun $ apply $ mcv $ sp $ 1.Apply $ mcv $ sp(foreachdstream.scala.scala:51( atorg.apache.spark.streaming.dstream.foreachdstream $$ anonfun $ 1 $$ anonfun $ apply $ mcv $ sp $ 1.apply(foreachdstream.scala:51( atorg.apache.spark.streaming.dstream.foreachdstream $$ anonfun $ 1 $$ anonfun $ apply $ mcv $ sp $ 1.apply(foreachdstream.scala:51( 请访问org.apache.spark.streaming.dstream.dstream.createrddwithlocalproperties(dstream.scala:415( atorg.apache.spark.streaming.dstream.foreachdstream $$ anonfun $ 1.Apply $ mcv $ sp(foreachdstream.scala:50( atorg.apache.spark.streaming.dstream.foreachdstream $$ anonfun $ 1.Apply(foreachdstream.scala:50( atorg.apache.spark.streaming.dstream.foreachdstream $$ anonfun $ 1.Apply(foreachdstream.scala:50( 在scala.util.try $ .apply(try.scala:192( atorg.apache.spark.streaming.scheduler.job.run(job.scala:39( atorg.apache.spark.streaming.scheduler.jobscheduler $ jobhandler $$ anonfun $ run $ 1.Apply $ mcv $ sp(jobscheduler.scala:254( 请访问org.apache.spark.streaming.scheduler.jobscheduler $ jobhandler $$ anonfun $ run $ 1.Apply(Jobscheduler.scala:254( 请访问org.apache.spark.streaming.scheduler.jobscheduler $ jobhandler $$ anonfun $ run $ 1.Apply(Jobscheduler.scala:254( 在scala.util.dynamicvariable.withvalue(dynamicvariable.scala:58( 请访问org.apache.spark.streaming.scheduler.jobscheduler $ jobhandler.run(jobscheduler.scala:253( at Java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145( at Java.util.concurrent.threadpoolexecutor $ worker.run(threadpoolexecutor.java:615( 在java.lang.thread.run(thread.java:745(引起:序列化堆栈: - 对象不是序列化(类:org.apache.spark.sql.dataframereader,value:org.apache.spark.sql.dataframereader@15fdb01( -FIELD(班级:ncc.org.civil.receiver.db2dataloadtokudu $$ $$ anonfun $ createsparkContext $ 1 $ 1 $$ anonfun $ apply $ 2,name:db2conn $ 1,类型:class org.apache.pache.spark.spark.spark.spark.sql.dataframereader( - 对象(ncc.org.civil.civil.receiver.db2dataloadtokudu $$ anonfun $ createsparkContext $ 1 $$ anonfun $ apply $ 2,( atorg.apache.spark.serializer.SerializationDebugger $ .Improveexception(SerializationDebugger.Scala:40( at rog.apache.spark.serializer.javaserializationstream.writeObject(javaserializer.scala:46( atrg.apache.spark.serializer.javaserializerInstance.Serialize(Javaserializer.Scala:100( at rog.apache.spark.util.closurecleaner $。验证(ClosureCleaner.Scala:295( ... 30多

理想情况下,您应该在dataRows.foreach中保留任何连接对象,因为闭合旨在将其序列化为执行者并在此处运行。这个概念被深入介绍 @此官方链接

在您的情况下,行是导致问题的关闭:

val df=getTableData(db2Conn,dbQuery)

所以,而不是使用火花来加载DB2表,在您的情况下,它会变成(结合方法之后(:

spark.read.format("jdbc").options(db2ConParams).option("dbtable",tableName).load()

在关闭中使用普通的JDBC来实现这一目标。您可以在JDBC代码中使用db2ConParams。(我认为它足够简单,可以序列化(。该链接还建议使用rdd.foreachPartitionConnectionPool进一步优化。

您尚未提及除df.show(2)以外的表数据所做的事情。如果行很大,那么您可以讨论有关用例的更多信息。也许,您需要考虑不同的设计。

最新更新