Flink:执行完成后的访问算子状态



假设我有一个具有一些原始状态的自定义 RichFunction。当 flink 作业结束时,如何使状态(来自运算符的每个并行实例(返回到主/驱动程序代码?

abstract class MyRichMap extends RichMapFunction[SomeType, Unit] {
protected var someVar: Engine = _ 
override def open(parameters: Configuration): Unit = {
// assume someVar inititation here
....
}
override def map(value: SomeType): Unit = {
engine.process(value)
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
...
someSource.map (new MyRichMap())
env.execute()
// How to get engine or some field of it here? (e.g., engine.someCounter)

解决这个问题的最佳方法是什么?

如果你想测试MyRichMap(),那么你可以从单元测试开始 - 请参阅 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

如果要测试完整的工作流,单个 JVM 内部的简单方法(例如,运行本地命令行或 Eclipse(是创建一个接收器,将结果捕获到(线程安全的(单例,然后检查内容。这意味着您的源已完成(有界(,因此工作流将终止。

相关内容

  • 没有找到相关文章

最新更新