如何在kafka中正确实现指数重试



我是卡夫卡的初学者
我一直在尝试在Kafka consumer中实现失败记录的指数重试。重试4次后,消费者需要关闭。重试应在1分钟后进行,然后在5分钟后再次进行,然后再在15分钟后进行30分钟。在所有这些尝试之后,如果重试不成功,那么我需要关闭消费者。我已经做了以下操作来实现它。但在5分钟后(max.poll.interval(,consoumer重新平衡。如何完成所有重试尝试(失败时尝试5次(,然后关闭消费者?

ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(60000);
backOffPolicy.setMultiplier(5);
backOffPolicy.setMaxInterval(900000);

RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
private RetryPolicy retryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(IllegalArgumentException.class, false);
exceptionMap.put(RecoverableDataAccessException.class, true);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(4, exceptionMap, true);
return simpleRetryPolicy;
}

max.poll.interval是kafka集群在将其视为"之前等待来自消费者的轮询调用的时间上限;死了";。一旦一个消费者群体中的一个消费者被认为已经死亡,卡夫卡集群就会触发该消费者群体的重新平衡。

在您的用例中,您希望等待>进行后续轮询前51分钟(重试时间+实际处理时间(。因此,您需要将此属性增加到足够的值(>51分钟(,以便kafka集群不会认为消费者已经死亡。

最新更新