LinkedHashMap 更改为 HashMap 并在 flink 数据流运算符中崩溃



给定这个虚拟代码:

1 case class MyObject(values:mutable.LinkedHashMap[String, String])
...
2    implicit val typeInfoString:TypeInformation[String] = TypeInformation.of(classOf[String])
3    implicit val typeInfoMyObject:TypeInformation[MyObject] = TypeInformation.of(classOf[MyObject])
4
5    val env = StreamExecutionEnvironment.getExecutionEnvironment
6
7    env
8      .fromElements("one")
9      .map(str =>
10      {
11        val obj = MyObject(mutable.LinkedHashMap("key" -> str))
12        val filteredMap1:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
13
14        obj
15      })
16      .map(obj =>
17      {
18        val filteredMap2:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
19
20        obj
21      })

应用程序将在第 18 行崩溃,但以下情况除外:

Caused by: java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.LinkedHashMap

问题似乎是通过序列化/反序列化,values成员更改其对象类型,或者换句话说,LinkedHashMap变为HashMap

请注意,与第 18 行相同的代码在第 12 行中完美运行。

将断点设置为第 12 行时,调试器/IntelliJ 将obj.values显示为LinkedHashMap,但第 18 行中的断点将在调试器中显示为HashMapobj.values

这是怎么回事?我该如何解决这个问题?毕竟,LinkedHashMap实现了Serializable?!

LinkedHashMap的默认 Kryo Chill 序列化程序不保留映射类型,而是将数据反序列化为HashMap。为了避免这种情况,需要为LinkedHashMap类型创建一个序列化程序:

class LinkedHashMapSerializer[K, V] extends Serializer[mutable.LinkedHashMap[K, V]] with Serializable {
override def write(kryo: Kryo, output: Output, `object`: mutable.LinkedHashMap[K, V]): Unit = {
kryo.writeObject(output, `object`.size)
for (elem <- `object`.iterator) {
kryo.writeClassAndObject(output, elem._1)
kryo.writeClassAndObject(output, elem._2)
}
}
override def read(kryo: Kryo, input: Input, `type`: Class[mutable.LinkedHashMap[K, V]]): mutable.LinkedHashMap[K, V] = {
val result = new mutable.LinkedHashMap[K, V]()
val size = kryo.readObject(input, classOf[Int])
for (_ <- 1 to size) {
val key = kryo.readClassAndObject(input).asInstanceOf[K]
val value = kryo.readClassAndObject(input).asInstanceOf[V]
result.put(key, value)
}
result
}
}

然后将其注册为 KryoSerializer

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[mutable.LinkedHashMap[String, String]], new LinkedHashMapSerializer())

相关内容

  • 没有找到相关文章

最新更新