将环境变量传递给键控函数的正确方法是什么,以及如何将其与键控函数一起使用 键控函数



我需要将一些配置传递给自定义的KeyedProcessFunction,我想用KeyedOneInputStreamOperatorTestHarness测试它。处理这个问题的正确方法是什么?

我查看了文档,但在搜索配置时,结果指向flink中现有配置的列表。

谢谢

您可以将配置传递给KeyedProcessFunction的构造函数。类似这样的东西:

DataStream<Event> stream = events
.keyBy(event -> event.key)
.process(new Example(config));
public static class Example extends KeyedProcessFunction<K, I, O> {
private final CONFIG config;
public Example(CONFIG config) {
this.config = config;
}
...
}

然后在你的测试中,你可以提供你想要的任何配置:

private final KeyedProcessFunction<K, I, O> functionToTest = new Example(config);
KeyedProcessOperator<K, I, O> operator = new KeyedProcessOperator<>(function);
KeyedOneInputStreamOperatorTestHarness<K, I, O> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, ...);

最新更新