我怎么知道我已经吃完了所有的卡夫卡主题



我正在使用Flink v1.4.0。我正在使用Kafka FLink ConsumerKafka主题使用数据,如下代码所示:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour
DataStream<String> stream = env.addSource(myConsumer);
...

有没有办法知道我是否已经消耗了整个主题?如何监控偏移量?(这是否足以确认我已经使用了Kafka主题中的所有数据?

由于 Kafka 通常用于连续的数据流,因此使用主题的"所有"可能是一个有意义的概念,也可能不是一个有意义的概念。我建议你看看关于 Flink 如何公开 Kafka 指标的文档,其中包括以下解释:

The difference between the committed offset and the most recent offset in 
each partition is called the consumer lag. If the Flink topology is consuming 
the data slower from the topic than new data is added, the lag will increase 
and the consumer will fall behind. For large production deployments we 
recommend monitoring that metric to avoid increasing latency.

因此,如果消费者滞后为零,那么您就被赶上了。也就是说,您可能希望能够自己比较偏移量,但我不知道有什么简单的方法可以做到这一点。

Kafka 它被用作流源,流没有结束。

如果我没记错的话,Flink 的 Kafka 连接器每隔 X 毫秒从主题中提取数据,因为所有 kafka 消费者都是活跃消费者,Kafka 不会通知你主题中是否有新数据。

因此,在您的情况下,只需设置超时,如果您在该时间内没有读取数据,则您已经读取了主题中的所有数据。

无论如何,如果你需要读取一批有限数据,你可以使用 Flink 的一些 Windows 或在 Kafka 主题中引入某种标记,来分隔批次的开始和开始。

相关内容

  • 没有找到相关文章

最新更新