我正在使用flink来构建源为kafka的管道。为了进行测试,我只想阅读Kafka的前N消息,然后需要停止流。
我该怎么做?我正在使用FlinkKafkaConsumer08
。
要使用Flink进行弗林克的托管状态,以便您的应用程序可以容忍。但是,如果您愿意忽略这一要求,这可能很简单:
public class Example {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.addSource(...)
.filter(new Limiter())
.print();
env.execute();
}
public static class Limiter implements FilterFunction<Event> {
private transient int count = 0;
@Override
public boolean filter(Event e) throws Exception {
if (++count <= 10) {
return true;
} else {
return false;
}
}
}
}