Kafka 消费者 - 具有更高优先级的主题



我正在使用Kafka Consumer来阅读几个主题,我需要其中一个具有更高的优先级。处理需要花费大量时间,并且(低优先级(主题中总是有很多消息,但我需要尽快处理来自其他消息的消息。

这与 Kafka 是否支持主题或消息的优先级类似? 但是这个使用的是旧的API。

在新的 API (0.10.1.1( 中,有方法

KafkaConsumer::pause(Collection)
KafkaConsumer::resume(Collection)

但是我不清楚,如何有效检测高优先级主题中有新消息,有必要暂停其他主题的消费。

有什么想法/例子吗?

最后,正如 dawsaw 建议的那样,我解决了这个问题 - 在处理循环中,我存储了我从中读取的所有主题/分区:

  • 开始偏移
  • 结束偏移
  • 提交 - 我不能使用 position,因为我订阅主题,而不是分区。

每当(endOffset - commited) > 0任何优先议题时,我都会就非优先议题打电话给consumer.pause(),并在(endOffset - commited) == 0所有优先议题后再次恢复这些议题。

我想你可以混合使用position((和committed((方法。 position(( 方法获取将要获取的下一条记录的偏移量,committed(( 方法获取给定分区的最后一个提交偏移量(如文档中所述(。 在轮询较低优先级之前,您可以检查 position(( 和 committed(( 以获取较高优先级。如果 position(( 高于 committed((,您可以将较低的优先级暂停 (( 并在较高的优先级 (( 上 poll((,然后恢复较低的优先级。

最新更新