如何确保我总是从Flink的Kafka主题开始消费
对于作为Flink 1.0.2一部分的Kafka 0.9.x消费者,似乎不再是Kafka而是Flink来控制偏移:
Flink在内部快照偏移,作为其分布式检查点。Kafka/ZooKeeper承诺的补偿只是为了让外界对进展的看法与Flink的保持同步对进展的看法。这样,监控和其他工作可以获得关于Flink Kafka消费者消费了多少的观点是一个话题。
这就是我所走的路,但我的Flink程序总是从它停止的地方开始,并且不会按照配置指示返回到开始:
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myflinkservice")
props.setProperty("auto.offset.reset", "earliest")
val incomingData = env.addSource(
new FlinkKafkaConsumer09[IncomingDataRecord](
"my.topic.name",
new IncomingDataSchema,
props
)
)
使用:
consumer.setStartFromEarliest();
我认为您可以通过指定一个随机group.id
:来解决这个问题
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", s"myflinkservice_${UUID.randomUUID}")
props.setProperty("auto.offset.reset", "smallest") // "smallest", not "earliest"
auto.offset.reset
仅在ZooKeeper中没有可用的初始偏移时工作。