火花应用程序收到错误"Task not serializable"?



以下代码出现"任务不可序列化"的错误?

错误

线程"main"中的异常 org.apache.spark.SparkException: 任务不可序列化 at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2101) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.map(RDD.scala:369) at ConnTest$.main(main.scala:41) at ConnTest.main(main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(未知来源) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 原因:java.io.NotSerializableException: DoWork 序列化堆栈: - 对象不可序列化(类:DoWork,值:DoWork@655621fd) - 字段(类:ConnTest$$anonfun$2,名称:doWork$1,类型:类DoWork) - 对象(类 ConnTest$$anonfun$2, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ...另外 20

个代码:

object ConnTest extends App {
override def main(args: scala.Array[String]): Unit = {
super.main(args)
val date = args(0)
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val jdbcSqlConn = "jdbc:sqlserver://......;"
val listJob = new ItemListJob(sqlContext, jdbcSqlConn)
val list = listJob.run(date).select("id").rdd.map(r => r(0).asInstanceOf[Int]).collect() 
// It returns about 3000 rows
val doWork = new DoWork(sqlContext, jdbcSqlConn)
val processed = sc.parallelize(list).map(d => {
doWork.run(d, date)
})
}
}
class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
def run(date: LocalDate) = {
sqlContext.read.format("jdbc").options(Map(
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url" -> jdbcSqlConn,
"dbtable" -> s"dbo.GetList('$date')"
)).load()
}
}
class DoWork(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
def run(id: Int, date: LocalDate) = {
// ...... read the data from database for id, and create a text file
val data = sqlContext.read.format("jdbc").options(Map(
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url" -> jdbcSqlConn,
"dbtable" -> s"someFunction('$id', $date)"
)).load()
// .... create a text file with content of data
(id, date) 
}
}

更新:

我将.map()呼叫更改为以下内容,

val processed = sc.parallelize(dealList).toDF.map(d => {
doWork.run(d(0).asInstanceOf[Int], rc)
})

现在我得到了错误

线程"main"中的异常 java.lang.UnsupportedOperationException: 找不到 java.time.LocalDate 的编码器 - 字段(类:"java.time.LocalDate",名称:"_2") - 根类:"斯卡拉。元组2" at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381)

问题出在以下关闭中:

val processed = sc.parallelize(list).map(d => {
doWork.run(d, date)
})

map中的闭包将在执行器中运行,因此 Spark 需要序列化doWork并将其发送给执行器。DoWork必须是可序列化的。然而。我看到DoWork包含scsqlContext所以你不能只DoWork实现Serializable,因为你不能在执行器中使用它们。

我猜您可能想将数据存储在DoWork数据库中。如果是这样,您可以将RDD转换为数据帧并通过jdbc方法保存,例如:

sc.parallelize(list).toDF.write.jdbc(...)

我无法提供更多建议,因为您没有提供DoWork中的代码。

最新更新