如何调试Flink中的可序列化例外



我遇到了几个序列化例外,我在Flink的Internet和Doc上进行了一些搜索;有一些著名的解决方案,例如瞬态,扩展可序列化等。每次异常的起源非常明确,但是就我而言,我找不到序列化的地方。

Q:我应该如何调试这种例外?

a.scala:

class executor ( val sink: SinkFunction[List[String]] {
    def exe(): Unit = {
        xxx.....addSink(sinks)
    }
}

B.Scala:

class Main extends App {
  def createSink: SinkFunction[List[String]] = new StringSink()
  object StringSink {
    // static
    val stringList: List[String] = List()
  }
  // create a testing sink
  class StringSink extends SinkFunction[List[String]] {
    override def invoke(strs: List[String]): Unit = {
        // add strs into the variable "stringList" of the compagin object StringSink
    }
  }
  new executor(createSink()).exe()
  // then do somethings with the strings
}

例外是:

实现 sindfunction 是不可序列化的。这 对象可能包含或引用不可序列化字段。

我发现的两个可疑点:

  1. StringSink的实例传递到另一个文件中。
  2. StringSink的类中,它使用静态变量stringList其compagin对象。

我面临类似的问题。它过去需要长期找出哪些成员/对象是不可序列化的。例外日志并没有真正有帮助的。

帮助我的是以下JVM选项,该选项可以在异常跟踪中启用更多详细信息。

启用此选项...

-dsun.io.serialization.extendedDebuginfo = true

我的第一个猜测是您在stringsink中没有没有参数的构造函数

POJO类型的规则从这里剪辑

Flink将数据类型识别为POJO类型(并允许" by-Name"字段引用),如果满足以下条件:

  1. 班级是公共和独立的(没有非静态内部类)
  2. 班级有一个公共无题构造函数
  3. 班级(和所有超类)中的所有非静态,非传播字段均为公共(和非最佳),或者具有公共getter-和setter-方法,遵循Java bean的命名惯例,以获取Getters和Getters和Getters和设定器。

只需添加一个no参数构造函数,然后重试

    class StringSink extends SinkFunction[List[String]] {
        public StringSink() {
        }
        
        @override def invoke(strs: List[String]): Unit = {
            // add strs into the variable "stringList" of the compagin object StringSink
        }
}

相关内容

  • 没有找到相关文章

最新更新