Scala ListBuffer 不能在 Flink 中用作 POJO 类型



当我运行下面的代码时。日志打印:

class scala.collection.mutable.ListBuffer 不包含字段 scala$collection$mutable$ListBuffer$$start 的 setter

类类 scala.collection.mutable.ListBuffer 不能用作 POJO 类型,因为并非所有字段都是有效的 POJO 字段,必须作为 GenericType 进行处理。

法典:

private lazy val schoolDescriptor = new ListStateDescriptor[School]("schoolDescriptor", classOf[School])

context.globalState.getListSate(schoolDescriptor).update(ListBuffer(new School))

类定义:

class School {
var classes: ListBuffer[Class] = ListBuffer()
}
class Class {
var students: ListBuffer[Class] = ListBuffer()
}
class Student {
var name = ""
}

如果 POJO 有 ListBuffer 类型字段,而 ListBuffer 的元素也有 ListBuffer 类型字段,我该怎么办?

评论中已经有一些关于不变性的提示。

一般来说,我也会推荐这样做,因为当你使用 Flink 状态时,通用 API 合约是 如果你更新你的状态对象(学校描述符(,你必须用它调用状态#更新。

这可能适用于堆状态而不调用更新(并不总是由 API 保证(,但不适用于例如 RocksDB 状态后端。 如果使用纯 POJO [1],序列化也会容易得多。

对于非 POJO,一般方法是实现您的自定义 org.apache.flink.api.common.typeutils.TypeSerializer 或注册自定义序列化器 [2] 使用另一个状态描述符构造函数:ListStateDescriptor(字符串名称,类型序列化程序类型序列化程序( 或者重构类以支持开箱即用的序列化 [3]。

从安德烈

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html#rules-for-pojo-types

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/custom_serializers.html

[3] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html

相关内容

  • 没有找到相关文章

最新更新