
我正在使用Flink v1.4.0。我正在使用Kafka FLink ConsumerKafka主题使用数据,如下代码所示:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour
DataStream<String> stream = env.addSource(myConsumer);


由于 Kafka 通常用于连续的数据流,因此使用主题的"所有"可能是一个有意义的概念,也可能不是一个有意义的概念。我建议你看看关于 Flink 如何公开 Kafka 指标的文档,其中包括以下解释:

The difference between the committed offset and the most recent offset in 
each partition is called the consumer lag. If the Flink topology is consuming 
the data slower from the topic than new data is added, the lag will increase 
and the consumer will fall behind. For large production deployments we 
recommend monitoring that metric to avoid increasing latency.


Kafka 它被用作流源,流没有结束。

如果我没记错的话,Flink 的 Kafka 连接器每隔 X 毫秒从主题中提取数据,因为所有 kafka 消费者都是活跃消费者,Kafka 不会通知你主题中是否有新数据。


无论如何,如果你需要读取一批有限数据,你可以使用 Flink 的一些 Windows 或在 Kafka 主题中引入某种标记,来分隔批次的开始和开始。


  • 没有找到相关文章
