Spark - 如何使地图可序列化



我需要从一个大数据集中提取和转换一些信息,这些信息稍后将被其他数据集使用。

由于要消耗的信息始终相同,并且由于它可以以对值方式存储,因此我正在考虑仅将此信息保存在将由 udf 使用的查看地图中,因此我避免了对大数据集的多次调用。

问题是我收到以下错误:

org.apache.spark.SparkException: Task not serializable

有没有办法使地图可序列化?

如果不可能,有没有另一种方法可以在 Spark 中的查看对象中存储信息?

这是我的代码:

val cityTimeZone: scala.collection.immutable.Map[String,Double]  = Map("CEB" -> 8.0, "LGW" -> 0.0, "CPT" -> 2.0
, "MUC" -> 1.0, "SGN" -> 7.0, "BNE" -> 10.0, "DME" -> 3.0, "FJR" -> 4.0, "BAH" -> 3.0, "ARN" -> 1.0, "FCO" -> 1.0, "DUS" -> 1.0, "MRU" -> 4.0, "JFK" -> -5.0, "GLA" -> 0.0)
def getLocalHour = udf ((city:String, timeutc:Int) => {
val timeOffset = cityTimeZone(city)
val localtime = if((timeutc+timeOffset)%24 >= 0)(timeutc+timeOffset)%24 else ((timeutc+timeOffset)%24)*(-1)
localtime
})
//$"dateutc" is a timestamp column like this: 2017-03-01 03:45:00 and $"city" a 3 letters code in capitals, like those in the map above
val newDF = DF
.select("dateutc","city")
.withColumn("utchour", hour($"dateutc"))
.withColumn("localhour", getLocalHour($"city", $"utchour"))
display(newDF)

成员变量声明

val cityTimeZone  

结合

cityTimeZone(city)

udf内部是有问题的,因为后者只是一个快捷方式

this.cityTimeZone(city)

其中this(大概(是某个巨大的不可序列化对象(可能是因为它包含对不可序列化 Spark 上下文的引用(。

使getLocalHour成为lazy val,并将udf所需的映射移动到getLocalHour作为局部变量的定义中,大致如下:

lazy val getLocalHour = {
val cityTimeZone: Map[String, Double] = Map("CEB" -> 8.0, "LGW" -> 0.0)
udf ((city:String, timeutc:Int) => {
val timeOffset = cityTimeZone(city)
val localtime = if((timeutc+timeOffset)%24 >= 0)(timeutc+timeOffset)%24 else ((timeutc+timeOffset)%24)*(-1)
localtime
})
}

或者,将cityTimeZone附加到一些可序列化的对象(即一些不包含对任何线程、套接字、Spark 上下文和所有其他不可序列化内容的引用的对象;例如,带有实用程序方法和常量的包对象就可以了(。

如果udf定义包含对任何其他成员变量的引用,请相应地处理这些变量。

似乎人们仍在解决这个问题。Andrey 的回答帮助我支持它们,但现在我可以为org.apache.spark.SparkException: Task not serializable提供一个更通用的解决方案,即不要将驱动程序中的变量声明为"全局变量",以便以后在执行器中访问它们。

所以我在这里犯的错误是在驱动程序中声明映射cityTimeZone,但后来我计划在 udf 中使用,该计算将在执行器中发生。

可能的解决方案是将cityTimeZone作为 udfgetLocalHour中的第三个参数传递,或者在cityTimeZone中声明该映射

最新更新