编写 Rx "RetryAfter"扩展方法



在IntroToRx一书中,作者建议为I/O编写一个"智能"重试,在一段时间后重试I/O请求,如网络请求。

这是确切的段落:

添加到您自己的库中的有用扩展方法可能是"返回" 关闭并重试"方法。与我合作过的团队已经找到了这样一个 功能在执行 I/O 时很有用,尤其是网络请求。这 概念是尝试,并在失败时等待给定的时间段和 然后重试。此方法的版本可能会考虑 要重试的异常类型,以及最大数量 重试的次数。您甚至可能希望将等待期延长至 在每次后续重试时不要那么激进。

不幸的是,我不知道如何编写此方法。 :(

这种回退重试实现的关键是延迟可观察量。延迟可观察量在有人订阅之前不会执行其工厂。它将为每个订阅调用工厂,使其成为我们重试方案的理想选择。

假设我们有一个触发网络请求的方法。

public IObservable<WebResponse> SomeApiMethod() { ... }

出于这个小片段的目的,让我们将延迟定义为source

var source = Observable.Defer(() => SomeApiMethod());

每当有人订阅源代码时,它都会调用SomeApiMethod并启动新的Web请求。每当它失败时重试它的天真方法是使用内置的重试运算符。

source.Retry(4)

不过,这对 API 来说不是很好,而且这不是您所要求的。我们需要在每次尝试之间延迟请求的启动。一种方法是延迟订阅。

Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4)

这并不理想,因为它即使在第一个请求上也会增加延迟,让我们解决这个问题。

int attempt = 0;
Observable.Defer(() => { 
   return ((++attempt == 1)  ? source : source.DelaySubscription(TimeSpan.FromSeconds(1)))
})
.Retry(4)
.Select(response => ...)

只是暂停一秒钟不是一个很好的重试方法,所以让我们将该常量更改为接收重试计数并返回适当延迟的函数。指数回退很容易实现。

Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2));
((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))

我们现在几乎完成了,我们只需要添加一种方法来指定我们应该重试哪些异常。让我们添加一个函数,该函数给定异常返回重试是否有意义,我们将其称为 retryOnError。

现在我们需要编写一些看起来很可怕的代码,但请耐心等待。

Observable.Defer(() => {
    return ((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))
        .Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null))
        .Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e)
            ? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e)
            : Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
    ? Observable.Return(t.Item2)
    : Observable.Throw<T>(t.Item3))

所有这些尖括号都是为了封送一个例外,我们不应该重试.Retry()。我们已经将内部可观察性设置为一个IObservable<Tuple<bool, WebResponse, Exception>>,其中第一个布尔值指示我们是否有响应或异常。如果 retryOnError 指示我们应该重试特定异常,则内部可观察量将抛出该异常,并且该异常将被重试拾取。SelectMany 只是解开我们的元组,并使生成的可观察性再次IObservable<WebRequest>

查看我的要点,包括完整源代码和最终版本的测试。有了这个运算符,我们可以非常简洁地编写重试代码

Observable.Defer(() => SomApiMethod())
  .RetryWithBackoffStrategy(
     retryCount: 4, 
     retryOnError: e => e is ApiRetryWebException
  )

也许我过度简化了情况,但如果我们看一下 Retry 的实现,它只是一个 Observable.Catch over 无限可枚举的可观察量:

private static IEnumerable<T> RepeatInfinite<T>(T value)
{
    while (true)
        yield return value;
}
public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)
{
    return Observable.Catch<TSource>(QueryLanguage.RepeatInfinite<IObservable<TSource>(source));
}

因此,如果我们采用这种方法,我们可以在第一次收益后添加延迟。

private static IEnumerable<IObservable<TSource>> RepeateInfinite<TSource> (IObservable<TSource> source, TimeSpan dueTime)
{
    // Don't delay the first time        
    yield return source;
    while (true)
        yield return source.DelaySubscription(dueTime);
    }
public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime)
{
    return RepeateInfinite(source, dueTime).Catch();
}

使用重试计数捕获特定异常的重载可以更加简洁:

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(this IObservable<TSource> source, TimeSpan dueTime, int count) where TException : Exception
{
    return source.Catch<TSource, TException>(exception =>
    {
        if (count <= 0)
        {
            return Observable.Throw<TSource>(exception);
        }
        return source.DelaySubscription(dueTime).RetryAfterDelay<TSource, TException>(dueTime, --count);
    });
}

请注意,此处的重载使用的是递归。乍一看,如果 count 是 Int32.MaxValue 之类的东西,似乎 StackOverflowException 是可能的。但是,DelaySubscription 使用调度程序来运行订阅操作,因此堆栈溢出是不可能的(即使用"蹦床"(。我想通过查看代码,这并不明显。我们可以通过将 DelaySubscription 重载中的调度程序显式设置为 Scheduler.Imimmediate 并传入 TimeSpan.Zero 和 Int32.MaxValue 来强制堆栈溢出。我们可以传递一个非即时调度程序来更明确地表达我们的意图,例如:

return source.DelaySubscription(dueTime, TaskPoolScheduler.Default).RetryAfterDelay<TSource, TException>(dueTime, --count);

更新:添加了重载以引入特定的调度程序。

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(
    this IObservable<TSource> source,
    TimeSpan retryDelay,
    int retryCount,
    IScheduler scheduler) where TException : Exception
{
    return source.Catch<TSource, TException>(
        ex =>
        {
            if (retryCount <= 0)
            {
                return Observable.Throw<TSource>(ex);
            }
            return
                source.DelaySubscription(retryDelay, scheduler)
                    .RetryAfterDelay<TSource, TException>(retryDelay, --retryCount, scheduler);
        });
} 
这是我

正在使用的那个:

public static IObservable<T> DelayedRetry<T>(this IObservable<T> src, TimeSpan delay)
{
    Contract.Requires(src != null);
    Contract.Ensures(Contract.Result<IObservable<T>>() != null);
    if (delay == TimeSpan.Zero) return src.Retry();
    return src.Catch(Observable.Timer(delay).SelectMany(x => src).Retry());
}

根据马库斯的回答,我写了以下内容:

public static class ObservableExtensions
{
    private static IObservable<T> BackOffAndRetry<T>(
        this IObservable<T> source,
        Func<int, TimeSpan> strategy,
        Func<int, Exception, bool> retryOnError,
        int attempt)
    {
        return Observable
            .Defer(() =>
            {
                var delay = attempt == 0 ? TimeSpan.Zero : strategy(attempt);
                var s = delay == TimeSpan.Zero ? source : source.DelaySubscription(delay);
                return s
                    .Catch<T, Exception>(e =>
                    {
                        if (retryOnError(attempt, e))
                        {
                            return source.BackOffAndRetry(strategy, retryOnError, attempt + 1);
                        }
                        return Observable.Throw<T>(e);
                    });
            });
    }
    public static IObservable<T> BackOffAndRetry<T>(
        this IObservable<T> source,
        Func<int, TimeSpan> strategy,
        Func<int, Exception, bool> retryOnError)
    {
        return source.BackOffAndRetry(strategy, retryOnError, 0);
    }
}

我更喜欢它,因为

  • 它不会修改attempts而是使用递归。
  • 它不使用retries,但传递尝试retryOnError的次数

这是我在研究 Rxx 如何做到这一点时想到的另一个稍微不同的实现。因此,它在很大程度上是Rxx方法的精简版本。

签名与马库斯的版本略有不同。您指定要重试的异常类型,延迟策略采用异常和重试计数,因此每次连续重试等可能会有更长的延迟。

我不能保证它是防错的,或者最好的方法,但它似乎有效。

public static IObservable<TSource> RetryWithDelay<TSource, TException>(this IObservable<TSource> source, Func<TException, int, TimeSpan> delayFactory, IScheduler scheduler = null)
where TException : Exception
{
    return Observable.Create<TSource>(observer =>
    {
        scheduler = scheduler ?? Scheduler.CurrentThread;
        var disposable = new SerialDisposable();
        int retryCount = 0;
        var scheduleDisposable = scheduler.Schedule(TimeSpan.Zero,
        self =>
        {
            var subscription = source.Subscribe(
            observer.OnNext,
            ex =>
            {
                var typedException = ex as TException;
                if (typedException != null)
                {
                    var retryDelay = delayFactory(typedException, ++retryCount);
                    self(retryDelay);
                }
                else
                {
                    observer.OnError(ex);
                }
            },
            observer.OnCompleted);
            disposable.Disposable = subscription;
        });
        return new CompositeDisposable(scheduleDisposable, disposable);
    });
}

这是我想出的那个。

不想将单个重试的项目连接成一个序列,而是在每次重试时发出整个源序列 - 因此运算符返回一个IObservable<IObservable<TSource>>。如果不需要,可以简单地将其Switch()回一个序列。

(背景:在我的用例中,源是一个热序列,我GroupByUntil出现一个关闭组的项目。如果在两次重试之间丢失了此项,则组永远不会关闭,从而导致内存泄漏。拥有序列序列仅允许对内部序列进行分组(或异常处理或......

/// <summary>
/// Repeats <paramref name="source"/> in individual windows, with <paramref name="interval"/> time in between.
/// </summary>
public static IObservable<IObservable<TSource>> RetryAfter<TSource>(this IObservable<TSource> source, TimeSpan interval, IScheduler scheduler = null)
{
    if (scheduler == null) scheduler = Scheduler.Default;
    return Observable.Create<IObservable<TSource>>(observer =>
    {
        return scheduler.Schedule(self =>
        {
            observer.OnNext(Observable.Create<TSource>(innerObserver =>
            {
                return source.Subscribe(
                    innerObserver.OnNext,
                    ex => { innerObserver.OnError(ex); scheduler.Schedule(interval, self); },
                    () => { innerObserver.OnCompleted(); scheduler.Schedule(interval, self); });
            }));
        });
    });
}

相关内容

  • 没有找到相关文章

最新更新