我需要将一些配置传递给自定义的KeyedProcessFunction,我想用KeyedOneInputStreamOperatorTestHarness测试它。处理这个问题的正确方法是什么?
我查看了文档,但在搜索配置时,结果指向flink中现有配置的列表。
谢谢
您可以将配置传递给KeyedProcessFunction的构造函数。类似这样的东西:
DataStream<Event> stream = events
.keyBy(event -> event.key)
.process(new Example(config));
public static class Example extends KeyedProcessFunction<K, I, O> {
private final CONFIG config;
public Example(CONFIG config) {
this.config = config;
}
...
}
然后在你的测试中,你可以提供你想要的任何配置:
private final KeyedProcessFunction<K, I, O> functionToTest = new Example(config);
KeyedProcessOperator<K, I, O> operator = new KeyedProcessOperator<>(function);
KeyedOneInputStreamOperatorTestHarness<K, I, O> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, ...);