我需要从一个大数据集中提取和转换一些信息,这些信息稍后将被其他数据集使用。
由于要消耗的信息始终相同,并且由于它可以以对值方式存储,因此我正在考虑仅将此信息保存在将由 udf 使用的查看地图中,因此我避免了对大数据集的多次调用。
问题是我收到以下错误:
org.apache.spark.SparkException: Task not serializable
有没有办法使地图可序列化?
如果不可能,有没有另一种方法可以在 Spark 中的查看对象中存储信息?
这是我的代码:
val cityTimeZone: scala.collection.immutable.Map[String,Double] = Map("CEB" -> 8.0, "LGW" -> 0.0, "CPT" -> 2.0
, "MUC" -> 1.0, "SGN" -> 7.0, "BNE" -> 10.0, "DME" -> 3.0, "FJR" -> 4.0, "BAH" -> 3.0, "ARN" -> 1.0, "FCO" -> 1.0, "DUS" -> 1.0, "MRU" -> 4.0, "JFK" -> -5.0, "GLA" -> 0.0)
def getLocalHour = udf ((city:String, timeutc:Int) => {
val timeOffset = cityTimeZone(city)
val localtime = if((timeutc+timeOffset)%24 >= 0)(timeutc+timeOffset)%24 else ((timeutc+timeOffset)%24)*(-1)
localtime
})
//$"dateutc" is a timestamp column like this: 2017-03-01 03:45:00 and $"city" a 3 letters code in capitals, like those in the map above
val newDF = DF
.select("dateutc","city")
.withColumn("utchour", hour($"dateutc"))
.withColumn("localhour", getLocalHour($"city", $"utchour"))
display(newDF)
成员变量声明
val cityTimeZone
结合
cityTimeZone(city)
udf
内部是有问题的,因为后者只是一个快捷方式
this.cityTimeZone(city)
其中this
(大概(是某个巨大的不可序列化对象(可能是因为它包含对不可序列化 Spark 上下文的引用(。
使getLocalHour
成为lazy val
,并将udf
所需的映射移动到getLocalHour
作为局部变量的定义中,大致如下:
lazy val getLocalHour = {
val cityTimeZone: Map[String, Double] = Map("CEB" -> 8.0, "LGW" -> 0.0)
udf ((city:String, timeutc:Int) => {
val timeOffset = cityTimeZone(city)
val localtime = if((timeutc+timeOffset)%24 >= 0)(timeutc+timeOffset)%24 else ((timeutc+timeOffset)%24)*(-1)
localtime
})
}
或者,将cityTimeZone
附加到一些可序列化的对象(即一些不包含对任何线程、套接字、Spark 上下文和所有其他不可序列化内容的引用的对象;例如,带有实用程序方法和常量的包对象就可以了(。
如果udf
定义包含对任何其他成员变量的引用,请相应地处理这些变量。
似乎人们仍在解决这个问题。Andrey 的回答帮助我支持它们,但现在我可以为org.apache.spark.SparkException: Task not serializable
提供一个更通用的解决方案,即不要将驱动程序中的变量声明为"全局变量",以便以后在执行器中访问它们。
所以我在这里犯的错误是在驱动程序中声明映射cityTimeZone
,但后来我计划在 udf 中使用,该计算将在执行器中发生。
可能的解决方案是将cityTimeZone
作为 udfgetLocalHour
中的第三个参数传递,或者在cityTimeZone
中声明该映射