我启动了两名消费者来消费卡夫卡的消息,有六个分区,因此每个消费者都被分配了三个分区。我发现消费者消耗了一段时间后无法对任何消息进行轮询,我可以看到滞后在那里,这意味着仍然有未汇总的消息。
当我杀死该程序并重新启动该程序时,消费者可以消耗其余的消息,但是一段时间后,尽管剩下的消息又停止了。
没有任何错误消息,我的代码的一部分如下:
KafkaConsumer<String, byte[]> consum = new KafkaConsumer<String, byte[]>(props);
ConsumerRecords<String, byte[]> records = kconsumer.poll(timeoutInMS);
我在一个JVM中启动了两个线程,每个线程将创建自身kafkaconsumer。
我使用了asynccommit,我设置了回调类是null。例如:
Map<TopicPartition, OffsetAndMetadata> offsets = ...;
consumer.commitAsync(offsets, null);
这是否引起了问题?
需要查看更多代码,因此很难知道您拥有哪些属性,但是KafkaConsumer
不是线程安全;启动应用程序的多个实例,而不是在同一JVM中的两个线程是明智的。
无论如何,您都可以在Confluent网站上找到可复制的Java消费者的示例,无论是同步提交还是异步。
这是一个更具先进的基本示例,而不是简单的while(true) { poll(); }
,它使您可以在周围扩展和包裹线程或执行人员服务。
public abstract class BasicConsumeLoop<K, V> implements Runnable {
private final KafkaConsumer<K, V> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public BasicConsumeLoop(Properties config, List<String> topics) {
this.consumer = new KafkaConsumer<>(config);
this.topics = topics;
this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ConsumerRecord<K, V> record);
public void run() {
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
ConsumerRecords<K, V> records = consumer.poll(500);
records.forEach(record -> process(record));
// commit();
}
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
shutdownLatch.await();
}
}