如何正确测试Flink窗口功能



有人知道如何在Flink中测试窗口功能吗?我正在使用依赖关系flink-test-utils_2.11

我的步骤是:

  1. 获取StreamExecutionEnvironment
  2. 创建对象并添加到Invironment
  3. keyBy
  4. 添加一个会话窗口
  5. 执行汇总函数
public class AggregateVariantCEVTest extends AbstractTestBase {
   @Test
    public void testAggregateVariantCev() throws Exception  {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);
       env.fromElements(objectOne, objectTwo)
               .keyBy(new KeyedByMyCustomKey())
               .window(EventTimeSessionWindows.withGap(Time.seconds(1)))
               .aggregate(new MyAgreggateFunction());

       JobExecutionResult result = env.execute();
       assertEquals(myExpectedResults, result.getAllAccumulatorResults());
   }
}

问题是result.getAllAccumulatorResults()大小为0。

有什么想法我在做什么错?预先感谢!

Windows不会将其结果放入蓄能器中。您应该将测试水槽附加到工作中,然后将该水槽的内容与您的期望进行比较。类似于集成测试部分中的文档中显示的内容。

可能正确的方法是使用TestHarness。一个很好的例子是Flink Project本身中的WindowOperatorTest

此外,您可以查看https://github.com/knaufk/flink-testing-pyramid for示例如何在不同级别的测试金字塔上测试Flink Job和测试的Flink文档https://ci.apache.org/projects/flink/flink-docs-Master/dev/stream/testing.html。

最新更新