KafkaConsumer assignment() returns empty



我正在使用

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.1</version>
</dependency>

以下代码段返回非空分配的分区,但poll(0)弃用。

val records = kafkaConsumer.poll(0) // <= deprecated
logInfo(s"Dummy call ${records.count()}")
val partitions = kafkaConsumer.assignment()
logInfo(s"partitions=${partitions}")

以下返回分区:

val records = kafkaConsumer.poll(Duration.ofMillis(0)) // <= not working
logInfo(s"Dummy call ${records.count()}")
val partitions = kafkaConsumer.assignment()
logInfo(s"partitions=${partitions}")

为什么?有什么想法吗?谢谢

这两个调用的区别在于获取元数据的方式。已弃用的poll无限期等待,直到成功检索元数据,而其他poll只尝试一次,通常无法在非常拍摄的时间间隔内连接到协调器(对于您的情况为 0(,并且返回没有任何有用的内容。这就是为什么您在呼叫poll(Duration.ofMillis(0))一次后看到一个空分配的原因。

最新更新