消费者.poll()即使没有承诺偏移也返回新记录



如果我有 enable.auto.commit=false,并且我致电 consumer.poll()而不致电consumer.commitAsync(),为什么 consumer.poll()返回下次被称为?

时的新记录

由于我没有提交偏移,我希望poll()会返回最新的偏移量,该偏移量应该再次是相同的记录。

我要问的是因为我试图在处理过程中处理故障方案。我希望不承诺偏移,poll()会再次返回相同的记录,以便我可以再次重新处理这些失败的记录。

public class MyConsumer implements Runnable {
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
            for (ConsumerRecord record : records) {
                try {
                   //process record
                   consumer.commitAsync();
                } catch (Exception e) {
                }
                /**
                If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception. 
                **/
            }
        }
    }
}

民意调查的起始偏移不是由经纪人决定的,而是由消费者决定。消费者跟踪最后一个接收的偏移量,并在下一次民意调查中要求以下一堆消息。

当消费者停止或失败时,偏移委员会开始发挥作用,而另一个不知道最后消耗的偏移量的实例是分区的消耗。

kafkaconsumer拥有相当广泛的Javadoc,值得一读。

消费者将从上次提交偏移中读取(如果有任何消费者离开小组或添加了新的消费者(,因此处理de ulplication在kafka中不会直接出现,所以您必须将最后一个过程偏移存储在外部商店中,当发生重新平衡或重新启动时,您应该寻求偏移并开始处理处理,否则您应该对DB中的消息中的某些唯一键进行检查以查找是Doblate

我想分享一些代码如何在Java代码中解决此问题。

这种方法是您对记录进行了调查,尝试处理记录,如果发生例外,您会寻求主题分区的最小值。之后,您进行commitAsync()

public class MyConsumer implements Runnable {
    @Override
    public void run() {
        while (true) {
            List<ConsumerRecord<String, LogLine>> records = StreamSupport
                .stream( consumer.poll(Long.MAX_VALUE).spliterator(), true )
                .collect( Collectors.toList() );
            boolean exceptionRaised = false;
            for (ConsumerRecord<String, LogLine> record : records) {
                try {
                    // process record
                } catch (Exception e) {
                    exceptionRaised = true;
                    break;
                }
            }
            if( exceptionRaised ) {
                Map<TopicPartition, Long> offsetMinimumForTopicAndPartition = records
                    .stream()
                    .collect( Collectors.toMap( r -> new TopicPartition( r.topic(), r.partition() ),
                        ConsumerRecord::offset,
                        Math::min
                    ) );
                for( Map.Entry<TopicPartition, Long> entry : offsetMinimumForTopicAndPartition.entrySet() ) {
                    consumer.seek( entry.getKey(), entry.getValue() );
                }
            }
            consumer.commitAsync();
        }
    }
}

使用此设置,您可以一次又一次地对消息进行轮询,直到您成功处理一个民意调查的所有消息。

请注意,您的代码应该能够处理毒药。否则,您的代码将卡在无尽的循环中。

最新更新