如何使用Kafka消费者为Flink CEP编写Junit测试代码



我们有一个execute()方法,使用FlinkKafkaConsumer08作为Flink CEP源,然后我们有了CEP模式,警报再次进入另一个kafka主题。如何为这个execute()方法编写一个junit测试用例?有人能为我提供一个junit代码的示例吗?

Pattern.<WebConnectionUseCase>begin("start")
.where(new SimpleCondition<WebConnectionUseCase>() {
public boolean filter(WebConnectionUseCase event) {
return ((event.getValues().getPredictedAvailableMemory()
- event.getValues().getAvailableMemory()) >= STARTDIFF);
}
}).followedBy("middle").where(new IterativeCondition<WebConnectionUseCase>() {
public boolean filter(WebConnectionUseCase value, Context<WebConnectionUseCase> ctx)
throws Exception {
Iterable<WebConnectionUseCase> middleStops = ctx.getEventsForPattern("middle");
List<Double> diffMemoryList = new ArrayList<Double>();
List<Double> connectionList = new ArrayList<Double>();
middleStops.forEach(item -> diffMemoryList.add(item.getValues().getPredictedAvailableMemory()
- item.getValues().getAvailableMemory()));
middleStops.forEach(item -> connectionList.add(item.getValues().getConnection()));
return checkIncreasingPattern(diffMemoryList) && checkDecreasingPattern(connectionList);
}
private boolean checkDecreasingPattern(List<Double> list) {
//code
}
private boolean checkIncreasingPattern(List<Double> list) {
// code
}
}).times(PATTERNCOUNT).consecutive().next("end").where(new SimpleCondition<WebConnectionUseCase>() {
@Override
public boolean filter(WebConnectionUseCase event) {
return ((event.getValues().getPredictedAvailableMemory()
- event.getValues().getAvailableMemory()) >= ENDDIFF);
}
}).within(Time.minutes(TIMEOUTDURATION));

我会将您想要测试的部分封装在一个对象中,您可以连接到用于测试的特殊源和接收器,以及连接到用于生产的实时数据源/接收器。

对于测试水槽,你可以使用这个:

public static class TestSink<OUT> implements SinkFunction<OUT> {
// must be static
public static final List values = new ArrayList<>();
@Override
public void invoke(OUT value, Context context) throws Exception {
values.add(value);
}
}

然后你的测试可以将sink.value与预期结果进行比较。

编写进行事件时间处理的测试(与使用处理时间的测试相比)更容易,因为随着处理时间的推移,结果是不确定的。而且,为了获得确定性的结果,以1的并行度运行测试更容易。

你会在这里找到一些测试的例子。

相关内容

  • 没有找到相关文章

最新更新