如何在Flink中读取第一个N Kafka消息



我正在使用flink来构建源为kafka的管道。为了进行测试,我只想阅读Kafka的前N消息,然后需要停止流。

我该怎么做?我正在使用FlinkKafkaConsumer08

要使用Flink进行弗林克的托管状态,以便您的应用程序可以容忍。但是,如果您愿意忽略这一要求,这可能很简单:

public class Example {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(...)
           .filter(new Limiter())
           .print();
        env.execute();
    }
    public static class Limiter implements FilterFunction<Event> {
        private transient int count = 0;
        @Override
        public boolean filter(Event e) throws Exception {
            if (++count <= 10) {
                return true;
            } else {
                return false;
            }
        }
    }
}

相关内容

  • 没有找到相关文章

最新更新