Commit无法完成,因为组已经重新平衡并将分区分配给另一个成员



我使用spring-kafka与recordFilterStrategy。

@Bean("manualImmediateListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> manualImmediateListenerContainerFactory(
ConsumerFactory<Object, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setPollTimeout(9999999);
factory.setBatchListener(false);
//配置手动提交offset
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {
@Override
public boolean filter(ConsumerRecord<Object, Object> consumerRecord) {
Shipment shipment = (Shipment) consumerRecord.value();
return shipment.getType().contains("YAW");
}
});
return factory;
}

这里我做了factory.setAckDiscarded(true)。当它收到一个应该丢弃的消息时。它将尝试返回丢弃的消息。然后它会得到一个异常,如下所示。我已经增加了max.poll.interval.ms并减小了批处理的最大大小。任何提示将非常感激!commitfailedexception:提交无法完成,因为组已经重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间长于配置的max.poll.interval.ms,这通常意味着轮询循环在消息处理上花费了太多时间。您可以通过增加max.poll.interval.ms或通过使用max.poll.records减少poll()中返回的批的最大大小来解决这个问题。

我注意到在kafka控制台。它一直在为重新平衡做准备。基本我认为这个问题是由kafka代理不稳定引起的,除了spring应用程序代码有问题。

最新更新