RxJava 中的指数退避



我有一个 API 可以获取触发事件的Observable

我想返回一个Observable,如果检测到 Internet 连接,该值每 defaultDelay 秒发出一个值,如果没有连接,则延迟 numberOfFailedAttempts^2 次。

尝试了很多不同的样式,我遇到的最大问题是retryWhen's可观察性只评估一次:

Observable
    .interval(defaultDelay,TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .repeatWhen((observable) ->
         observable.concatMap(repeatObservable -> {
             if(internetConnectionDetector.isInternetConnected()){
                 consecutiveRetries = 0;
                 return observable;
             } else {
                 consecutiveRetries++;
                 int backoffDelay = (int)Math.pow(consecutiveRetries,2);
                 return observable.delay(backoffDelay, TimeUnit.SECONDS);
                }
         }).onBackpressureDrop())
    .onBackpressureDrop();

有什么方法可以做我正在尝试做的事情吗?我找到了一个相关的问题(现在找不到它搜索),但所采取的方法似乎不适用于动态值。

在你的代码中有两个错误:

  1. 为了重复一些可观察的序列,该序列必须是有限的。 即,与其interval,不如使用类似 justfromCallable的东西,就像我在下面的示例中所做的那样。
  2. repeatWhen的内部函数中,您需要返回新的延迟可观察源,因此您必须返回Observable.timer()而不是observable.delay()

工作代码:

public void testRepeat() throws InterruptedException {
    logger.info("test start");
    int DEFAULT_DELAY = 100; // ms
    int ADDITIONAL_DELAY = 100; // ms
    AtomicInteger generator = new AtomicInteger(0);
    AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive
    Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
            .repeatWhen(counts -> {
                AtomicInteger retryCounter = new AtomicInteger(0);
                return counts.flatMap(c -> {
                    int retry = 0;
                    if (connectionAlive.get()) {
                        retryCounter.set(0); // reset counter
                    } else {
                        retry = retryCounter.incrementAndGet();
                    }
                    int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
                    logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
                    return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
                });
            })
            .subscribe(v -> logger.info("got {}", v));
    Thread.sleep(220);
    logger.info("connection dropped");
    connectionAlive.set(false);
    Thread.sleep(2000);
    logger.info("connection is back alive");
    connectionAlive.set(true);
    Thread.sleep(2000);
    subscription.dispose();
    logger.info("test complete");
}

在此处查看有关repeatWhen的详细文章。

我一直发现retryWhen有点低级,所以对于指数退避,我使用了一个经过单元测试的构建器(如Abhijit),可用于rxjava-extras的RxJava 1.x。我建议使用上限版本,这样延迟的指数增长就不会超过您定义的最大值。

这是您使用它的方式:

observable.retryWhen(
    RetryWhen.exponentialBackoff(
        delay, maxDelay, TimeUNIT.SECONDS)
    .build());

我不同意retryWhen有问题,但如果你发现一个错误,请向 RxJava 报告。错误修复速度很快!

你需要 rxjava-extras 0.8.0.6 或更高版本,它位于 Maven Central 上:

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-extras</artifactId>
    <version>0.8.0.6</version>
</dependency>

如果您需要 RxJava 2.x 版本,请告诉我。同样的功能在 0.1.4 的 rxjava2-extras 中可用。

您可以使用

retryWhen 运算符配置没有连接时的延迟。如何定期发出项目是一个单独的主题(查找intervaltimer运算符)。如果您无法弄清楚,请打开一个单独的问题。

我在我的Github上有一个广泛的例子,但我会在这里给你要点。

RetryWithDelay retryWithDelay = RetryWithDelay.builder()
    .retryDelayStrategy(RetryDelayStrategy.RETRY_COUNT)
    .build()
Single.fromCallable(() -> {
    ...
}).retryWhen(retryWithDelay)
.subscribe(j -> {
    ...
})

RetryWithDelay定义如下。我使用的是 RxJava 2.x,所以如果你使用的是 1.x,签名应该是Func1<Observable<? extends Throwable>, Observable<Object>>的。

public class RetryWithDelay implements
        Function<Flowable<? extends Throwable>, Publisher<Object>> {
    ...
}

RetryWithDelay 类。

重试策略枚举。

这允许我根据RetryDelayStrategy配置各种超时,常量,线性,指数。对于您的使用案例,您将选择延迟策略CONSTANT_DELAY_TIMES_RETRY_COUNT并在构建RetryWithDelay时调用retryDelaySeconds(2)

retryWhen是一个复杂的,甚至可能是错误的操作员。您在网上找到的大多数示例都使用 range 运算符,如果不进行重试,该运算符将失败。有关详细信息,请参阅我的答案 这里.

相关内容

  • 没有找到相关文章

最新更新