假设我有一个具有一些原始状态的自定义 RichFunction。当 flink 作业结束时,如何使状态(来自运算符的每个并行实例(返回到主/驱动程序代码?
abstract class MyRichMap extends RichMapFunction[SomeType, Unit] {
protected var someVar: Engine = _
override def open(parameters: Configuration): Unit = {
// assume someVar inititation here
....
}
override def map(value: SomeType): Unit = {
engine.process(value)
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
...
someSource.map (new MyRichMap())
env.execute()
// How to get engine or some field of it here? (e.g., engine.someCounter)
解决这个问题的最佳方法是什么?
如果你想测试MyRichMap()
,那么你可以从单元测试开始 - 请参阅 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
如果要测试完整的工作流,单个 JVM 内部的简单方法(例如,运行本地命令行或 Eclipse(是创建一个接收器,将结果捕获到(线程安全的(单例,然后检查内容。这意味着您的源已完成(有界(,因此工作流将终止。