有人知道如何在Flink
中测试窗口功能吗?我正在使用依赖关系flink-test-utils_2.11
。
我的步骤是:
- 获取
StreamExecutionEnvironment
- 创建对象并添加到Invironment
- 做
keyBy
- 添加一个会话窗口
- 执行汇总函数
public class AggregateVariantCEVTest extends AbstractTestBase {
@Test
public void testAggregateVariantCev() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromElements(objectOne, objectTwo)
.keyBy(new KeyedByMyCustomKey())
.window(EventTimeSessionWindows.withGap(Time.seconds(1)))
.aggregate(new MyAgreggateFunction());
JobExecutionResult result = env.execute();
assertEquals(myExpectedResults, result.getAllAccumulatorResults());
}
}
问题是result.getAllAccumulatorResults()
大小为0。
有什么想法我在做什么错?预先感谢!
Windows不会将其结果放入蓄能器中。您应该将测试水槽附加到工作中,然后将该水槽的内容与您的期望进行比较。类似于集成测试部分中的文档中显示的内容。
可能正确的方法是使用TestHarness
。一个很好的例子是Flink Project本身中的WindowOperatorTest
。
此外,您可以查看https://github.com/knaufk/flink-testing-pyramid for示例如何在不同级别的测试金字塔上测试Flink Job和测试的Flink文档https://ci.apache.org/projects/flink/flink-docs-Master/dev/stream/testing.html。