消费者无法从卡夫卡(Kafka)进行调查



我启动了两名消费者来消费卡夫卡的消息,有六个分区,因此每个消费者都被分配了三个分区。我发现消费者消耗了一段时间后无法对任何消息进行轮询,我可以看到滞后在那里,这意味着仍然有未汇总的消息。

当我杀死该程序并重新启动该程序时,消费者可以消耗其余的消息,但是一段时间后,尽管剩下的消息又停止了。

没有任何错误消息,我的代码的一部分如下:

 KafkaConsumer<String, byte[]> consum = new KafkaConsumer<String, byte[]>(props);
 ConsumerRecords<String, byte[]> records = kconsumer.poll(timeoutInMS);

我在一个JVM中启动了两个线程,每个线程将创建自身kafkaconsumer。

我使用了asynccommit,我设置了回调类是null。例如:

Map<TopicPartition, OffsetAndMetadata> offsets = ...;
consumer.commitAsync(offsets, null);

这是否引起了问题?

需要查看更多代码,因此很难知道您拥有哪些属性,但是KafkaConsumer不是线程安全;启动应用程序的多个实例,而不是在同一JVM中的两个线程是明智的。

无论如何,您都可以在Confluent网站上找到可复制的Java消费者的示例,无论是同步提交还是异步。

这是一个更具先进的基本示例,而不是简单的while(true) { poll(); },它使您可以在周围扩展和包裹线程或执行人员服务。

public abstract class BasicConsumeLoop<K, V> implements Runnable {
  private final KafkaConsumer<K, V> consumer;
  private final List<String> topics;
  private final AtomicBoolean shutdown;
  private final CountDownLatch shutdownLatch;
  public BasicConsumeLoop(Properties config, List<String> topics) {
    this.consumer = new KafkaConsumer<>(config);
    this.topics = topics;
    this.shutdown = new AtomicBoolean(false);
    this.shutdownLatch = new CountDownLatch(1);
  }
  public abstract void process(ConsumerRecord<K, V> record);
  public void run() {
    try {
      consumer.subscribe(topics);
      while (!shutdown.get()) {
        ConsumerRecords<K, V> records = consumer.poll(500);
        records.forEach(record -> process(record));
        // commit();
      }
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }
  public void shutdown() throws InterruptedException {
    shutdown.set(true);
    shutdownLatch.await();
  }
}

最新更新