使用新的数据源API对Flink作业进行单元测试



在过去,我的单元通过用可插入的Sources/Sink编写作业来测试flink作业,然后通过简单的Source-/SinkFunctions模拟它们。像这样:

public class Example {
private static SourceFunction<String> someSource;
private static SourceFunction<String> someOtherSource;
private static SinkFunction<String> someSink;
Example(
SourceFunction<String> someSource,
SourceFunction<String> someOtherSource,
SinkFunction<String> someSink
) {
this.someSource = someSource;
this.someOtherSource = someOtherSource;
this.someSink = someSink;
}

void build(StreamExecutionEnvironment env) {
/*
... build your logic here ... 
*/
}

public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Example(
new FlinkKafkaConsumer<String>(...),
new FlinkKafkaConsumer<String>(...),
new FlinkKafkaProducer<String>(...)
).build(env);

env.execute();
}
}

通过这种方式,我可以很容易地测试整个工作,只需交换真实的KafkaSinks&具有自定义Sink-/SourceFunctions的源。

新的DataSource要复杂得多,因为它只是为测试用例实现的。即使我实现了它,它也会以一个通用的地狱结束,让它在构造函数中可注入。所以我想知道什么是最好的方法来对整个工作进行单元测试,而不带出一个完整的Kafka集群。

有什么想法或解决方案吗?

您可以通过基于NumberSequenceSource构建一些东西,然后再构建一个映射来实现这一目标。

FLIP-238中描述的DataGeneratorSource旨在满足这一需求,它将作为1.16的一部分发布。(我相信它是独立的,所以你可以复制它,现在就开始使用它。(

使用可插拔接收器的另一种方法是使用DataStream#executeAndCollect():

DataStream<Integer> stream = env.fromElements(1, 2, 3);
try (CloseableIterator<Integer> results = stream.executeAndCollect()) {
assertThat(results).containsInAnyOrder(1, 2, 3);
}

相关内容

  • 没有找到相关文章

最新更新