我如何优雅地关闭以执行方法启动的streamExecutionEnvironment



我正在尝试关闭在我们的junit集成测试之一中启动的streamExecutionEnvironment。一旦处理了流中的所有项目,我希望能够以确定性的方式关闭此执行环境。

现在,当我调用streamExecutionEnvironment.ecute方法时,它永远不会从该调用中返回。

[我在这里可能为时已晚,但会为那些正在寻找答案或提示的人回答]

实际上您需要做的是优雅地从SourceFunction<T>退出。然后,整个StreamExecutionEnvironment将自动关闭。为此,您可能需要一个特殊的结束事件才能发送到源功能。

写或扩展(如果使用预定义的源功能(检查特殊传入事件,该事件将在集成测试结束时发出,并断开循环或从源中取消订阅。基本模式将如下所示。

public class TestSource implements SourceFunction<Event> {
    public void run(SourceContext<T> ctx) throws Exception {
        while (hasContent()) {
            Event event = readNextEvent();
            if (isAnEndEvent(event)) {
              break;
            }
            ctx.collect(event);
        }
    }
}

取决于您的情况,在Junit测试中,您应该在每个/所有测试用例的结尾发送此特殊事件。

// or @AfterClass
@After   
public void doFinish() {
    // send the special event from a different thread.
}

有时您可能必须从其他线程(基本上是生成测试事件的地方(来执行此操作。不像上述。

由于此问题,建议您为测试实现单独的源函数,因此更容易修改接受特殊的关闭事件。但是从未在您的实际源函数中期望进入生产。:(


sourceFunction的Javadoc还解释了stop()函数,您可以从TwitterSource如何实现中看到一个示例。

优雅地停止功能
功能可以另外实现{@link org.apache.flink.api.common.functions.stoppableFunction}接口。 与"取消"相比,"停止"功能是一种优雅 离开国家和发射元素的出口一致 状态

相关内容

  • 没有找到相关文章

最新更新