当我运行下面的代码时。日志打印:
class scala.collection.mutable.ListBuffer 不包含字段 scala$collection$mutable$ListBuffer$$start 的 setter
类类 scala.collection.mutable.ListBuffer 不能用作 POJO 类型,因为并非所有字段都是有效的 POJO 字段,必须作为 GenericType 进行处理。
法典:
private lazy val schoolDescriptor = new ListStateDescriptor[School]("schoolDescriptor", classOf[School])
context.globalState.getListSate(schoolDescriptor).update(ListBuffer(new School))
类定义:
class School {
var classes: ListBuffer[Class] = ListBuffer()
}
class Class {
var students: ListBuffer[Class] = ListBuffer()
}
class Student {
var name = ""
}
如果 POJO 有 ListBuffer 类型字段,而 ListBuffer 的元素也有 ListBuffer 类型字段,我该怎么办?
评论中已经有一些关于不变性的提示。
一般来说,我也会推荐这样做,因为当你使用 Flink 状态时,通用 API 合约是 如果你更新你的状态对象(学校描述符(,你必须用它调用状态#更新。
这可能适用于堆状态而不调用更新(并不总是由 API 保证(,但不适用于例如 RocksDB 状态后端。 如果使用纯 POJO [1],序列化也会容易得多。
对于非 POJO,一般方法是实现您的自定义 org.apache.flink.api.common.typeutils.TypeSerializer 或注册自定义序列化器 [2] 使用另一个状态描述符构造函数:ListStateDescriptor(字符串名称,类型序列化程序类型序列化程序( 或者重构类以支持开箱即用的序列化 [3]。
从安德烈
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html#rules-for-pojo-types
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/custom_serializers.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html