我实现了一个ProcessFunction,它使用Guava缓存来过滤传入的事件流。代码如下所示:
object myJob {
private def updateCache(cacheObject, someValue) = {}
private def getCacheValue(cacheObject, someKey) = {}
override def run(params, executionEnv) = {
val inputStream = executionEnv.stream
val c = CacheBuilder.newBuilder()
val outStream = inputStream.process(new ProcessFunction() {
updateCache()
getCacheValue}
)
}
}
将作业提交到 Flink 时,我收到以下错误:
Caused by: org.apache.flink.api.common.InvalidProgramException: The implementation of the ProcessFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
at org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:666)
at org.apache.flink.streaming.api.scala.DataStream.process(DataStream.scala:686)
知道我做错了什么吗?如何解决此序列化错误?
该错误基本上表明您依赖于一个无法为 Flink 序列化的对象。在您显示的情况下,将加载程序的字段标记为懒惰应该可以解决问题:
lazy val c = CacheBuilder.newBuilder()
一般来说,在这种情况下,你应该参考 Flink 的文档,其中解释了这个问题