Im使用Flink(最近通过git)从kafka流式传输到cassandra。为了简化单元测试,我通过Dagger添加了依赖项注入。
ObjectGraph似乎设置正确,但Flink将"内部对象"标记为"不可序列化"。如果我直接包括这些对象,它们就会起作用——那么有什么区别呢?
有问题的类实现了MapFunction和@Inject一个用于cassandra的模块和一个用于读取配置文件的模块。
有没有一种方法可以构建它,这样我就可以使用后期绑定,或者Flink让这变得不可能了?
编辑:
fwiw-依赖项注入(通过dagger)和RichMapFunction不能共存。Dagger不允许您在其定义中包含任何具有扩展的对象。
此外:
通过Dagger Lazy<t> 也不会序列化。
线程"main"org.apache.flink.api.common.InvalidProgramException异常:对象com.someapp.SaveMap@2e029d61不可序列化
…
引起原因:java.io.NotSerializableException:dagger.internal.LazyBinding$1
在深入探讨问题的细节之前,先了解一下Apache Flink中函数的可串行性背景:
可串行性
ApacheFlink使用Java序列化(Java.io.Serializable)将函数对象(此处为MapFunction
)发送给并行执行它们的工作程序。因此,函数需要是可序列化的:函数可能不包含任何不可序列化的字段,即不是基元(int、long、double…)且未实现java.io.Serializable
的类型。
处理不可序列化构造的典型方法是延迟初始化它们。
惰性初始化
在Flink函数中使用不可序列化类型的一种方法是延迟初始化它们。当函数被序列化以发送时,包含这些类型的字段仍然是null
,并且只有在工作者反序列化函数之后才设置。
-
在Scala中,您可以简单地使用惰性字段,例如
lazy val x = new NonSerializableType()
。NonSerializableType
类型实际上仅在第一次访问变量x
时创建,该变量通常在worker上。因此,该类型可能是不可串行化的,因为当函数被串行化以发送到工作线程时,x
为null。 -
在Java中,如果使函数成为Rich函数,则可以初始化函数的
open()
方法上的不可序列化字段。丰富的函数(如RichMapFunction
)是基本函数(此处为MapFunction
)的扩展版本,可以访问open()
和close()
等生命周期方法。
惰性依赖项注入
我不太熟悉依赖注入,但dagger似乎也提供了类似于懒惰依赖的东西,这可能有助于作为一种变通方法,就像Scala:中的懒惰变量一样
new MapFunction<Long, Long>() {
@Inject Lazy<MyDependency> dep;
public Long map(Long value) {
return dep.get().doSomething(value);
}
}
我也遇到了类似的问题。有两种方法可以不反序列化依赖项。
-
使您的依赖关系保持静态,但这并不总是可能的。它也会打乱你的代码设计。
-
使用瞬态:通过将依赖项声明为瞬态,就意味着它们不是对象持久状态的一部分,不应该是序列化的一部分。
public ClassA implements Serializable{ //class A code here } public ClassB{ //class B code here } public class MySinkFunction implements SinkFunction<MyData> { private ClassA mySerializableDependency; private transient ClassB nonSerializableDependency; }
当您使用外部库时,这一点尤其有用,因为您无法更改外部库的实现以使其可序列化。