在 Flink 中有用的调试



我正在开发一个新的 Flink 流应用程序,无法进行调试以逐步完成代码中更关键的部分。

这是我的主程序(删除了一些部分(:

def main(args: Array[String]) {
val env = StreamExecutionEnvironment.createLocalEnvironment()
env.setStateBackend(new RocksDBStateBackend(statePath))

env.addSource(new KafkaConsumer().getKafkaKeyedConsumer(inTopic, inBrokers))
    .map {
      tup => (tup._2.get("payload").get("itemId").asText, tup._2.get("payload").get("version").asLong, tup._2, tup._1)}
    .keyBy(0)
    .flatMap({
      new FilterPastVersions()
    })
      .print()

env.execute("My Program")

这是FilterPastVersions类:

class FilterPastVersions extends RichFlatMapFunction[(String, Long, ObjectNode, String), (String, ObjectNode)] {
  private var version: ValueState[Long] = _
  override def flatMap(input: (String, Long, ObjectNode, String), out: Collector[(String, ObjectNode)]): Unit = {
    // access the state value
    val tmpCurrentVersion = version.value()
    // If it hasn't been used before, it will be null
    if (tmpCurrentVersion == null || input._2 > tmpCurrentVersion){
      version.update(input._2)
      out.collect((input._4, input._3))
    }
  }
  override def open(parameters: Configuration): Unit = {
    val versionDesc = new ValueStateDescriptor[Long]("version", createTypeInformation[Long])
    versionDesc.setQueryable("version-state")
    version = getRuntimeContext.getState(versionDesc)
  }
}

如果我在 main 函数的每一行放置一个断点,执行确实在每个断点处停止。但是,直到 env.execute 之后才实际处理任何数据,因此这些断点不显示任何执行。

如果我在FilterPastVersionsflatmap函数中放置断点,这些断点永远不会命中。该程序确实成功地打印了来自Kafka的消息。

我在这里遗漏了什么,还是这是 Flink 的限制?我正在使用 IntelliJ,并使用远程调试器尝试过此操作,只需单击应用程序配置的调试按钮即可。

这是因为 flink 程序被懒惰地执行了。因此,你的调试会话只会执行 flink 管道的声明部分。流处理本身是在execute()方法期间完成的。

查看文档。(https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html(

所有 Flink 程序都懒惰地执行:当 程序的主方法被执行,数据加载和 转换不会直接发生。相反,每个操作都是 创建并添加到程序的计划中。操作实际上是 当执行由 execute(( 显式触发时执行 调用执行环境。程序是否执行 本地或集群取决于执行环境的类型

惰性求值允许您构建复杂的程序 Flink 作为一个整体规划单元执行。

相关内容

  • 没有找到相关文章

最新更新