我正在开发一个新的 Flink 流应用程序,无法进行调试以逐步完成代码中更关键的部分。
这是我的主程序(删除了一些部分(:
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.createLocalEnvironment()
env.setStateBackend(new RocksDBStateBackend(statePath))
env.addSource(new KafkaConsumer().getKafkaKeyedConsumer(inTopic, inBrokers))
.map {
tup => (tup._2.get("payload").get("itemId").asText, tup._2.get("payload").get("version").asLong, tup._2, tup._1)}
.keyBy(0)
.flatMap({
new FilterPastVersions()
})
.print()
env.execute("My Program")
这是FilterPastVersions
类:
class FilterPastVersions extends RichFlatMapFunction[(String, Long, ObjectNode, String), (String, ObjectNode)] {
private var version: ValueState[Long] = _
override def flatMap(input: (String, Long, ObjectNode, String), out: Collector[(String, ObjectNode)]): Unit = {
// access the state value
val tmpCurrentVersion = version.value()
// If it hasn't been used before, it will be null
if (tmpCurrentVersion == null || input._2 > tmpCurrentVersion){
version.update(input._2)
out.collect((input._4, input._3))
}
}
override def open(parameters: Configuration): Unit = {
val versionDesc = new ValueStateDescriptor[Long]("version", createTypeInformation[Long])
versionDesc.setQueryable("version-state")
version = getRuntimeContext.getState(versionDesc)
}
}
如果我在 main 函数的每一行放置一个断点,执行确实在每个断点处停止。但是,直到 env.execute
之后才实际处理任何数据,因此这些断点不显示任何执行。
如果我在FilterPastVersions
的flatmap
函数中放置断点,这些断点永远不会命中。该程序确实成功地打印了来自Kafka的消息。
我在这里遗漏了什么,还是这是 Flink 的限制?我正在使用 IntelliJ,并使用远程调试器尝试过此操作,只需单击应用程序配置的调试按钮即可。
这是因为 flink 程序被懒惰地执行了。因此,你的调试会话只会执行 flink 管道的声明部分。流处理本身是在execute()
方法期间完成的。
查看文档。(https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html(
所有 Flink 程序都懒惰地执行:当 程序的主方法被执行,数据加载和 转换不会直接发生。相反,每个操作都是 创建并添加到程序的计划中。操作实际上是 当执行由 execute(( 显式触发时执行 调用执行环境。程序是否执行 本地或集群取决于执行环境的类型
惰性求值允许您构建复杂的程序 Flink 作为一个整体规划单元执行。