如果我有 enable.auto.commit=false
,并且我致电 consumer.poll()
而不致电consumer.commitAsync()
,为什么 consumer.poll()
返回下次被称为?
由于我没有提交偏移,我希望poll()
会返回最新的偏移量,该偏移量应该再次是相同的记录。
我要问的是因为我试图在处理过程中处理故障方案。我希望不承诺偏移,poll()
会再次返回相同的记录,以便我可以再次重新处理这些失败的记录。
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord record : records) {
try {
//process record
consumer.commitAsync();
} catch (Exception e) {
}
/**
If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception.
**/
}
}
}
}
民意调查的起始偏移不是由经纪人决定的,而是由消费者决定。消费者跟踪最后一个接收的偏移量,并在下一次民意调查中要求以下一堆消息。
当消费者停止或失败时,偏移委员会开始发挥作用,而另一个不知道最后消耗的偏移量的实例是分区的消耗。
kafkaconsumer拥有相当广泛的Javadoc,值得一读。
消费者将从上次提交偏移中读取(如果有任何消费者离开小组或添加了新的消费者(,因此处理de ulplication在kafka中不会直接出现,所以您必须将最后一个过程偏移存储在外部商店中,当发生重新平衡或重新启动时,您应该寻求偏移并开始处理处理,否则您应该对DB中的消息中的某些唯一键进行检查以查找是Doblate
我想分享一些代码如何在Java代码中解决此问题。
这种方法是您对记录进行了调查,尝试处理记录,如果发生例外,您会寻求主题分区的最小值。之后,您进行commitAsync()
。
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
List<ConsumerRecord<String, LogLine>> records = StreamSupport
.stream( consumer.poll(Long.MAX_VALUE).spliterator(), true )
.collect( Collectors.toList() );
boolean exceptionRaised = false;
for (ConsumerRecord<String, LogLine> record : records) {
try {
// process record
} catch (Exception e) {
exceptionRaised = true;
break;
}
}
if( exceptionRaised ) {
Map<TopicPartition, Long> offsetMinimumForTopicAndPartition = records
.stream()
.collect( Collectors.toMap( r -> new TopicPartition( r.topic(), r.partition() ),
ConsumerRecord::offset,
Math::min
) );
for( Map.Entry<TopicPartition, Long> entry : offsetMinimumForTopicAndPartition.entrySet() ) {
consumer.seek( entry.getKey(), entry.getValue() );
}
}
consumer.commitAsync();
}
}
}
使用此设置,您可以一次又一次地对消息进行轮询,直到您成功处理一个民意调查的所有消息。
请注意,您的代码应该能够处理毒药。否则,您的代码将卡在无尽的循环中。