Kafka 0.9 新的消费者 API ---如何只观察消费者偏移量



我正在尝试使用 Java API 监视给定组的消费者偏移量。我创建了一个额外的消费者,它不订阅任何主题,而只是调用consumer.committed(topic)来获取偏移量信息。这种工作,但是:

为了进行测试,我只使用一个真实的消费者(即订阅该主题的消费者(。当我使用 close() 关闭它并稍后重新启动它时,尽管我使用 poll(1000) ,但在订阅和第一次使用消息之间需要 27 秒。

我猜这与重新平衡可能被非订阅消费者混淆有关。这可能吗?有没有更好的方法来监控 Java API 的偏移量(我知道命令行工具,但需要使用 API(。

有不同的方法可以检查主题偏移量,具体取决于您想要它的目的,除了上面描述的"提交"之外,这里还有两个选项:

1( 如果你想知道消费者下次线程启动时从代理开始获取数据的偏移量 ID,那么你必须使用"position"作为

long offsetPosition;
TopicPartition tPartition = new TopicPartition(topic,partitionToReview);
    offsetPosition = kafkaConsumer.position(tPartition);
    System.out.println("offset of the next record to fetch is : " + position);

2( 从 ConsumerRecord 对象调用 "offset((" 方法,在执行 kafkaConsumer 的轮询后

Iterator<ConsumerRecord<byte[],byte[]>> it = kafkaConsumer.poll(1000).iterator();
while(it.hasNext()){
ConsumerRecord<byte[],byte[]> record = it.next();
System.out.println("offset : " + record.offset());
}

找到了:监控消费者增加了混乱,但不是罪魁祸首。最后,它很容易理解,尽管有点出乎意料(至少对我来说(:

session.timeout.ms的默认值为 30 秒。当使用者消失时,最多需要 30 秒才能宣布死亡并重新平衡工作。为了进行测试,我已经停止了我拥有的单个消费者,等待了三秒钟,然后重新启动了一个新的消费者。然后,这在开始之前需要 27 秒,填补了 30 秒的超时时间。

我本以为一个单独的消费者启动不会等待超时到期,而是开始"重新平衡",即立即抓住工作。似乎超时必须在重新平衡工作之前到期,即使只有一个使用者也是如此。

为了使测试更快地完成,我更改了配置,为使用者使用较低的session.timeout.ms,为代理使用较低的group.min.session.timeout.ms

总结:使用不订阅任何主题的使用者来监控偏移量就可以了,并且似乎不会干扰重新平衡过程。

最新更新