在过去,我的单元通过用可插入的Sources/Sink编写作业来测试flink作业,然后通过简单的Source-/SinkFunctions
模拟它们。像这样:
public class Example {
private static SourceFunction<String> someSource;
private static SourceFunction<String> someOtherSource;
private static SinkFunction<String> someSink;
Example(
SourceFunction<String> someSource,
SourceFunction<String> someOtherSource,
SinkFunction<String> someSink
) {
this.someSource = someSource;
this.someOtherSource = someOtherSource;
this.someSink = someSink;
}
void build(StreamExecutionEnvironment env) {
/*
... build your logic here ...
*/
}
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Example(
new FlinkKafkaConsumer<String>(...),
new FlinkKafkaConsumer<String>(...),
new FlinkKafkaProducer<String>(...)
).build(env);
env.execute();
}
}
通过这种方式,我可以很容易地测试整个工作,只需交换真实的KafkaSinks&具有自定义Sink-/SourceFunctions
的源。
新的DataSource要复杂得多,因为它只是为测试用例实现的。即使我实现了它,它也会以一个通用的地狱结束,让它在构造函数中可注入。所以我想知道什么是最好的方法来对整个工作进行单元测试,而不带出一个完整的Kafka集群。
有什么想法或解决方案吗?
您可以通过基于NumberSequenceSource
构建一些东西,然后再构建一个映射来实现这一目标。
FLIP-238中描述的DataGeneratorSource
旨在满足这一需求,它将作为1.16的一部分发布。(我相信它是独立的,所以你可以复制它,现在就开始使用它。(
使用可插拔接收器的另一种方法是使用DataStream#executeAndCollect()
:
DataStream<Integer> stream = env.fromElements(1, 2, 3);
try (CloseableIterator<Integer> results = stream.executeAndCollect()) {
assertThat(results).containsInAnyOrder(1, 2, 3);
}