给定这个虚拟代码:
1 case class MyObject(values:mutable.LinkedHashMap[String, String])
...
2 implicit val typeInfoString:TypeInformation[String] = TypeInformation.of(classOf[String])
3 implicit val typeInfoMyObject:TypeInformation[MyObject] = TypeInformation.of(classOf[MyObject])
4
5 val env = StreamExecutionEnvironment.getExecutionEnvironment
6
7 env
8 .fromElements("one")
9 .map(str =>
10 {
11 val obj = MyObject(mutable.LinkedHashMap("key" -> str))
12 val filteredMap1:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
13
14 obj
15 })
16 .map(obj =>
17 {
18 val filteredMap2:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
19
20 obj
21 })
应用程序将在第 18 行崩溃,但以下情况除外:
Caused by: java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.LinkedHashMap
问题似乎是通过序列化/反序列化,values
成员更改其对象类型,或者换句话说,LinkedHashMap
变为HashMap
。
请注意,与第 18 行相同的代码在第 12 行中完美运行。
将断点设置为第 12 行时,调试器/IntelliJ 将obj.values
显示为LinkedHashMap
,但第 18 行中的断点将在调试器中显示为HashMap
obj.values
。
这是怎么回事?我该如何解决这个问题?毕竟,LinkedHashMap
实现了Serializable
?!
LinkedHashMap
的默认 Kryo Chill 序列化程序不保留映射类型,而是将数据反序列化为HashMap
。为了避免这种情况,需要为LinkedHashMap
类型创建一个序列化程序:
class LinkedHashMapSerializer[K, V] extends Serializer[mutable.LinkedHashMap[K, V]] with Serializable {
override def write(kryo: Kryo, output: Output, `object`: mutable.LinkedHashMap[K, V]): Unit = {
kryo.writeObject(output, `object`.size)
for (elem <- `object`.iterator) {
kryo.writeClassAndObject(output, elem._1)
kryo.writeClassAndObject(output, elem._2)
}
}
override def read(kryo: Kryo, input: Input, `type`: Class[mutable.LinkedHashMap[K, V]]): mutable.LinkedHashMap[K, V] = {
val result = new mutable.LinkedHashMap[K, V]()
val size = kryo.readObject(input, classOf[Int])
for (_ <- 1 to size) {
val key = kryo.readClassAndObject(input).asInstanceOf[K]
val value = kryo.readClassAndObject(input).asInstanceOf[V]
result.put(key, value)
}
result
}
}
然后将其注册为 KryoSerializer
:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[mutable.LinkedHashMap[String, String]], new LinkedHashMapSerializer())