我正在使用Kafka Consumer来阅读几个主题,我需要其中一个具有更高的优先级。处理需要花费大量时间,并且(低优先级(主题中总是有很多消息,但我需要尽快处理来自其他消息的消息。
这与 Kafka 是否支持主题或消息的优先级类似? 但是这个使用的是旧的API。
在新的 API (0.10.1.1( 中,有方法
KafkaConsumer::pause(Collection)
KafkaConsumer::resume(Collection)
但是我不清楚,如何有效检测高优先级主题中有新消息,有必要暂停其他主题的消费。
有什么想法/例子吗?
最后,正如 dawsaw 建议的那样,我解决了这个问题 - 在处理循环中,我存储了我从中读取的所有主题/分区:
- 开始偏移
- 结束偏移
- 提交 - 我不能使用 position,因为我订阅主题,而不是分区。
每当(endOffset - commited) > 0
任何优先议题时,我都会就非优先议题打电话给consumer.pause()
,并在(endOffset - commited) == 0
所有优先议题后再次恢复这些议题。
我想你可以混合使用position((和committed((方法。 position(( 方法获取将要获取的下一条记录的偏移量,committed(( 方法获取给定分区的最后一个提交偏移量(如文档中所述(。 在轮询较低优先级之前,您可以检查 position(( 和 committed(( 以获取较高优先级。如果 position(( 高于 committed((,您可以将较低的优先级暂停 (( 并在较高的优先级 (( 上 poll((,然后恢复较低的优先级。