Scala error: Task not serializable, NotSerializableException



我是scala的初学者,当我运行这段代码时得到Scala error: Task not serializable, NotSerializableException: org.apache.log4j.Logger。我用了@transient lazy valobject PSRecord extends Serializable。然而,这个问题无法解决。

代码在spark上运行

            object PSRecord extends Serializable{
                def main(args: Array[String]): Unit = {
                @transient
                val ss = ''
                val jobName = s"${PSRecord.getClass.getPackage.getName}" 
                @transient lazy val LOGGER: Logger = LogManager.getLogger(jobName)  
                val configuration = (taskConfigObj \ "config").extract[PSConfigNew]
                FeatureThresholdNew.run(ss, date, configuration, LOGGER)
          }
        }
        
        object FeatureThresholdNew extends Serializable {
    
           import org.apache.log4j.{LogManager, Logger}
           
           def run(ss: SparkSession,  LOGGER: Logger): Unit = {
               Map(
                  "train" -> data,
                  "valid" -> data.filter($"hour".isin(hoursValid:_*))
                  ).foreach(
               mData => {
                     val numPartitions = if (mData._1.equals("train")) numTrainPartitions else numValidPartitions
                     val dfData = mData._2.select(...)
                     dfData.rdd.repartition(numPartitions).
                     mapPartitions(
                     partition => {
    
                       partition.map(
                       row => {
                              val features = localFeatureColumns.zipWithIndex.map(
                        col => {
    
                          LOGGER.info(s"#### col: ${col}")
    
    }
    }
    }
}
}
                 
        
          
        
         

在这段代码

def run(ss: SparkSession,  LOGGER: Logger): Unit = {
  // [...]
  dfData.rdd
    .repartition(numPartitions)
    .mapPartitions{ partition =>
      partition.map{ row =>
        val features = localFeatureColumns.zipWithIndex.map{ col =>
          LOGGER.info(s"#### col: ${col}") // *** HERE ***
  }}};
}

您在映射操作中使用LOGGER变量。这意味着必须序列化LOGGER实例并将其发送给工作节点。因此,Logger类需要为Serializable

也许你误解了lazy transient val的工作方式:它并不适用于Logger的实例,而是适用于PSRecord类中的LOGGER字段。因此,如果您序列化PSRecord,它将不会序列化LOGGER字段(然后将具有null值)。

注1:

遵循编码标准,为了清晰,变量应该是小写的。此外,我认为我们通常称SparkSession, sparkSession或简称spark:)

注2:

如果你需要在映射操作中发送变量(类似的),使用广播。它更快,并且使数据的分布在代码中显式。

最新更新