Spring云kafka流消费者重试机制



下面的属性是什么意思,我如何使用它们?

spring.cloud.stream.bindings.atcommnity.consumer.maxAttempts=5
spring.cloud.stream.bindings.atcommnity.consumer.backOffInitialInterval=1000
spring.cloud.stream.bindings.atcommnity.consumer.backOffMaxInterval=2000000
spring.cloud.stream.bindings.atcommnity.consumer.backOffMultiplier=2.0
spring.cloud.stream.bindings.atcommnity.consumer.batch-mode=false

后退将从backOffInitialInterval开始,然后每次下一次尝试将乘以backOffMultiplier,但不超过backOffMaxInterval

currentInterval = Math.min(backOffInitialInterval * Math.pow(backOffMultiplier, retryNum), backOffMaxInterval)

在你的例子中,它将是:

1000 ms→2000 ms→4000 ms→8000 ms→16000

女士

上面Alex的回答是对这个问题的完美解释。这里只是为了增加我的知识和分享一些额外的发现。我从代码中提供实际实现。

private BackOff createBackOff(
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
int maxAttempts = extendedConsumerProperties.getMaxAttempts();
if (maxAttempts < 2) {
return new FixedBackOff(0L, 0L);
}
int initialInterval = extendedConsumerProperties.getBackOffInitialInterval();
double multiplier = extendedConsumerProperties.getBackOffMultiplier();
int maxInterval = extendedConsumerProperties.getBackOffMaxInterval();
ExponentialBackOff backOff = new ExponentialBackOff(initialInterval, multiplier);
backOff.setMaxInterval(maxInterval);
long maxElapsed = extendedConsumerProperties.getBackOffInitialInterval();
double accum = maxElapsed;
for (int i = 1; i < maxAttempts - 1; i++) {
accum = accum * multiplier;
if (accum > maxInterval) {
accum = maxInterval;
}
maxElapsed += accum;
}
backOff.setMaxElapsedTime(maxElapsed);
return backOff;
}
类名KafkaMessageChannelBinder

最新更新