我正在尝试关闭在我们的junit集成测试之一中启动的streamExecutionEnvironment。一旦处理了流中的所有项目,我希望能够以确定性的方式关闭此执行环境。
现在,当我调用streamExecutionEnvironment.ecute方法时,它永远不会从该调用中返回。
[我在这里可能为时已晚,但会为那些正在寻找答案或提示的人回答]
实际上您需要做的是优雅地从SourceFunction<T>
退出。然后,整个StreamExecutionEnvironment
将自动关闭。为此,您可能需要一个特殊的结束事件才能发送到源功能。
写或扩展(如果使用预定义的源功能(检查特殊传入事件,该事件将在集成测试结束时发出,并断开循环或从源中取消订阅。基本模式将如下所示。
public class TestSource implements SourceFunction<Event> {
public void run(SourceContext<T> ctx) throws Exception {
while (hasContent()) {
Event event = readNextEvent();
if (isAnEndEvent(event)) {
break;
}
ctx.collect(event);
}
}
}
取决于您的情况,在Junit测试中,您应该在每个/所有测试用例的结尾发送此特殊事件。
// or @AfterClass
@After
public void doFinish() {
// send the special event from a different thread.
}
有时您可能必须从其他线程(基本上是生成测试事件的地方(来执行此操作。不像上述。
由于此问题,建议您为测试实现单独的源函数,因此更容易修改接受特殊的关闭事件。但是从未在您的实际源函数中期望进入生产。:(
sourceFunction的Javadoc还解释了stop()
函数,您可以从TwitterSource
如何实现中看到一个示例。
优雅地停止功能
功能可以另外实现{@link org.apache.flink.api.common.functions.stoppableFunction}接口。 与"取消"相比,"停止"功能是一种优雅 离开国家和发射元素的出口一致 状态
。