我是scala的初学者,当我运行这段代码时得到Scala error: Task not serializable, NotSerializableException: org.apache.log4j.Logger
。我用了@transient lazy val
和object PSRecord extends Serializable
。然而,这个问题无法解决。
代码在spark上运行
object PSRecord extends Serializable{
def main(args: Array[String]): Unit = {
@transient
val ss = ''
val jobName = s"${PSRecord.getClass.getPackage.getName}"
@transient lazy val LOGGER: Logger = LogManager.getLogger(jobName)
val configuration = (taskConfigObj \ "config").extract[PSConfigNew]
FeatureThresholdNew.run(ss, date, configuration, LOGGER)
}
}
object FeatureThresholdNew extends Serializable {
import org.apache.log4j.{LogManager, Logger}
def run(ss: SparkSession, LOGGER: Logger): Unit = {
Map(
"train" -> data,
"valid" -> data.filter($"hour".isin(hoursValid:_*))
).foreach(
mData => {
val numPartitions = if (mData._1.equals("train")) numTrainPartitions else numValidPartitions
val dfData = mData._2.select(...)
dfData.rdd.repartition(numPartitions).
mapPartitions(
partition => {
partition.map(
row => {
val features = localFeatureColumns.zipWithIndex.map(
col => {
LOGGER.info(s"#### col: ${col}")
}
}
}
}
}
在这段代码
def run(ss: SparkSession, LOGGER: Logger): Unit = {
// [...]
dfData.rdd
.repartition(numPartitions)
.mapPartitions{ partition =>
partition.map{ row =>
val features = localFeatureColumns.zipWithIndex.map{ col =>
LOGGER.info(s"#### col: ${col}") // *** HERE ***
}}};
}
您在映射操作中使用LOGGER
变量。这意味着必须序列化LOGGER
实例并将其发送给工作节点。因此,Logger
类需要为Serializable
也许你误解了lazy transient val的工作方式:它并不适用于Logger的实例,而是适用于PSRecord
类中的LOGGER
字段。因此,如果您序列化PSRecord
,它将不会序列化LOGGER
字段(然后将具有null值)。
注1:
遵循编码标准,为了清晰,变量应该是小写的。此外,我认为我们通常称SparkSession
, sparkSession
或简称spark
:)
注2:
如果你需要在映射操作中发送变量(类似的),使用广播。它更快,并且使数据的分布在代码中显式。