我写了一个非常简单的Flink流作业,它使用FlinkKafkaConsumer082
从Kafka获取数据。
protected DataStream<String> getKafkaStream(StreamExecutionEnvironment env, String topic) {
Properties result = new Properties();
result.put("bootstrap.servers", getBrokerUrl());
result.put("zookeeper.connect", getZookeeperUrl());
result.put("group.id", getGroup());
return env.addSource(
new FlinkKafkaConsumer082<>(
topic,
new SimpleStringSchema(), result);
}
这非常有效,每当我把一些东西放进卡夫卡的主题中时,我的Flink工作就会收到并处理它。现在我试着看看如果我的Flink Job因为某种原因没有在线会发生什么。因此,我关闭了"flink"工作,继续给卡夫卡发信息。然后我又开始了我的Flink工作,希望它能处理同时发送的消息。
然而,我收到了这样的消息:
No prior offsets found for some partitions in topic collector.Customer. Fetched the following start offsets [FetchPartition {partition=0, offset=25}]
因此,它基本上忽略了自上次关闭Flink作业以来收到的所有消息,只是在队列末尾开始读取。从我收集的FlinkKafkaConsumer082
的文档中,它自动负责与Kafka代理同步处理的偏移。然而,情况似乎并非如此。
我使用的是单节点Kafka安装(Kafka发行版附带的安装)和单节点Zookeper安装(也是与Kafka发布版捆绑的安装)。
我怀疑这是某种错误的配置或类似的东西,但我真的不知道从哪里开始寻找。其他人有没有遇到过这个问题,也许已经解决了?
我找到了原因。您需要在StreamExecutionEnvironment
中显式启用检查点,以使Kafka连接器将处理后的偏移写入Zookeeper。如果你不启用它,Kafka连接器将不会写入最后一个读取偏移量,因此当收集作业重新启动时,它将无法从那里恢复。所以一定要写:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(); // <-- this is the important part
Anatoly关于更改初始偏移量的建议可能仍然是个好主意,以防检查点由于某种原因而失败。
https://kafka.apache.org/08/configuration.html
将auto.offset.reset设置为最小(默认为最大)
自动偏移重置:
当Zookeeper中没有初始偏移时该怎么办,或者如果偏移超出范围:
最小:自动将偏移重置为最小偏移
最大:自动将偏移重置为最大偏移
其他任何操作:向消费者抛出异常。
如果将其设置为最大值,则当它订阅的主题的分区数在代理上发生更改时,使用者可能会丢失一些消息。到为了防止在添加分区时丢失数据,请将auto.offset.reset设置为最小
还要确保getGroup()在重新启动后是相同的