flink-使用匕首注入-不可序列化



Im使用Flink(最近通过git)从kafka流式传输到cassandra。为了简化单元测试,我通过Dagger添加了依赖项注入。

ObjectGraph似乎设置正确,但Flink将"内部对象"标记为"不可序列化"。如果我直接包括这些对象,它们就会起作用——那么有什么区别呢?

有问题的类实现了MapFunction@Inject一个用于cassandra的模块和一个用于读取配置文件的模块。

有没有一种方法可以构建它,这样我就可以使用后期绑定,或者Flink让这变得不可能了?


编辑:

fwiw-依赖项注入(通过dagger)和RichMapFunction不能共存。Dagger不允许您在其定义中包含任何具有扩展的对象。

此外:

通过Dagger Lazy<t> 也不会序列化。

线程"main"org.apache.flink.api.common.InvalidProgramException异常:对象com.someapp.SaveMap@2e029d61不可序列化

引起原因:java.io.NotSerializableException:dagger.internal.LazyBinding$1

在深入探讨问题的细节之前,先了解一下Apache Flink中函数的可串行性背景:

可串行性

ApacheFlink使用Java序列化(Java.io.Serializable)将函数对象(此处为MapFunction)发送给并行执行它们的工作程序。因此,函数需要是可序列化的:函数可能不包含任何不可序列化的字段,即不是基元(int、long、double…)且未实现java.io.Serializable的类型。

处理不可序列化构造的典型方法是延迟初始化它们。

惰性初始化

在Flink函数中使用不可序列化类型的一种方法是延迟初始化它们。当函数被序列化以发送时,包含这些类型的字段仍然是null,并且只有在工作者反序列化函数之后才设置。

  • 在Scala中,您可以简单地使用惰性字段,例如lazy val x = new NonSerializableType()NonSerializableType类型实际上仅在第一次访问变量x时创建,该变量通常在worker上。因此,该类型可能是不可串行化的,因为当函数被串行化以发送到工作线程时,x为null。

  • 在Java中,如果使函数成为Rich函数,则可以初始化函数的open()方法上的不可序列化字段。丰富的函数(如RichMapFunction)是基本函数(此处为MapFunction)的扩展版本,可以访问open()close()等生命周期方法。

惰性依赖项注入

我不太熟悉依赖注入,但dagger似乎也提供了类似于懒惰依赖的东西,这可能有助于作为一种变通方法,就像Scala:中的懒惰变量一样

new MapFunction<Long, Long>() {
  @Inject Lazy<MyDependency> dep;
  public Long map(Long value) {
    return dep.get().doSomething(value);
  }
}

我也遇到了类似的问题。有两种方法可以不反序列化依赖项。

  1. 使您的依赖关系保持静态,但这并不总是可能的。它也会打乱你的代码设计。

  2. 使用瞬态:通过将依赖项声明为瞬态,就意味着它们不是对象持久状态的一部分,不应该是序列化的一部分。

public ClassA implements Serializable{
  //class A code here
}
public ClassB{
  //class B code here
}
public class MySinkFunction implements SinkFunction<MyData> {
  private ClassA mySerializableDependency;
  private transient ClassB nonSerializableDependency;
}

当您使用外部库时,这一点尤其有用,因为您无法更改外部库的实现以使其可序列化。

相关内容

  • 没有找到相关文章