在IntroToRx一书中,作者建议为I/O编写一个"智能"重试,在一段时间后重试I/O请求,如网络请求。
这是确切的段落:
添加到您自己的库中的有用扩展方法可能是"返回" 关闭并重试"方法。与我合作过的团队已经找到了这样一个 功能在执行 I/O 时很有用,尤其是网络请求。这 概念是尝试,并在失败时等待给定的时间段和 然后重试。此方法的版本可能会考虑 要重试的异常类型,以及最大数量 重试的次数。您甚至可能希望将等待期延长至 在每次后续重试时不要那么激进。
不幸的是,我不知道如何编写此方法。 :(
这种回退重试实现的关键是延迟可观察量。延迟可观察量在有人订阅之前不会执行其工厂。它将为每个订阅调用工厂,使其成为我们重试方案的理想选择。
假设我们有一个触发网络请求的方法。
public IObservable<WebResponse> SomeApiMethod() { ... }
出于这个小片段的目的,让我们将延迟定义为source
var source = Observable.Defer(() => SomeApiMethod());
每当有人订阅源代码时,它都会调用SomeApiMethod并启动新的Web请求。每当它失败时重试它的天真方法是使用内置的重试运算符。
source.Retry(4)
不过,这对 API 来说不是很好,而且这不是您所要求的。我们需要在每次尝试之间延迟请求的启动。一种方法是延迟订阅。
Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4)
这并不理想,因为它即使在第一个请求上也会增加延迟,让我们解决这个问题。
int attempt = 0;
Observable.Defer(() => {
return ((++attempt == 1) ? source : source.DelaySubscription(TimeSpan.FromSeconds(1)))
})
.Retry(4)
.Select(response => ...)
只是暂停一秒钟不是一个很好的重试方法,所以让我们将该常量更改为接收重试计数并返回适当延迟的函数。指数回退很容易实现。
Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2));
((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1)))
我们现在几乎完成了,我们只需要添加一种方法来指定我们应该重试哪些异常。让我们添加一个函数,该函数给定异常返回重试是否有意义,我们将其称为 retryOnError。
现在我们需要编写一些看起来很可怕的代码,但请耐心等待。
Observable.Defer(() => {
return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1)))
.Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null))
.Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e)
? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e)
: Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
? Observable.Return(t.Item2)
: Observable.Throw<T>(t.Item3))
所有这些尖括号都是为了封送一个例外,我们不应该重试.Retry()
。我们已经将内部可观察性设置为一个IObservable<Tuple<bool, WebResponse, Exception>>
,其中第一个布尔值指示我们是否有响应或异常。如果 retryOnError 指示我们应该重试特定异常,则内部可观察量将抛出该异常,并且该异常将被重试拾取。SelectMany 只是解开我们的元组,并使生成的可观察性再次IObservable<WebRequest>
。
查看我的要点,包括完整源代码和最终版本的测试。有了这个运算符,我们可以非常简洁地编写重试代码
Observable.Defer(() => SomApiMethod())
.RetryWithBackoffStrategy(
retryCount: 4,
retryOnError: e => e is ApiRetryWebException
)
也许我过度简化了情况,但如果我们看一下 Retry 的实现,它只是一个 Observable.Catch over 无限可枚举的可观察量:
private static IEnumerable<T> RepeatInfinite<T>(T value)
{
while (true)
yield return value;
}
public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)
{
return Observable.Catch<TSource>(QueryLanguage.RepeatInfinite<IObservable<TSource>(source));
}
因此,如果我们采用这种方法,我们可以在第一次收益后添加延迟。
private static IEnumerable<IObservable<TSource>> RepeateInfinite<TSource> (IObservable<TSource> source, TimeSpan dueTime)
{
// Don't delay the first time
yield return source;
while (true)
yield return source.DelaySubscription(dueTime);
}
public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime)
{
return RepeateInfinite(source, dueTime).Catch();
}
使用重试计数捕获特定异常的重载可以更加简洁:
public static IObservable<TSource> RetryAfterDelay<TSource, TException>(this IObservable<TSource> source, TimeSpan dueTime, int count) where TException : Exception
{
return source.Catch<TSource, TException>(exception =>
{
if (count <= 0)
{
return Observable.Throw<TSource>(exception);
}
return source.DelaySubscription(dueTime).RetryAfterDelay<TSource, TException>(dueTime, --count);
});
}
请注意,此处的重载使用的是递归。乍一看,如果 count 是 Int32.MaxValue 之类的东西,似乎 StackOverflowException 是可能的。但是,DelaySubscription 使用调度程序来运行订阅操作,因此堆栈溢出是不可能的(即使用"蹦床"(。我想通过查看代码,这并不明显。我们可以通过将 DelaySubscription 重载中的调度程序显式设置为 Scheduler.Imimmediate 并传入 TimeSpan.Zero 和 Int32.MaxValue 来强制堆栈溢出。我们可以传递一个非即时调度程序来更明确地表达我们的意图,例如:
return source.DelaySubscription(dueTime, TaskPoolScheduler.Default).RetryAfterDelay<TSource, TException>(dueTime, --count);
更新:添加了重载以引入特定的调度程序。
public static IObservable<TSource> RetryAfterDelay<TSource, TException>(
this IObservable<TSource> source,
TimeSpan retryDelay,
int retryCount,
IScheduler scheduler) where TException : Exception
{
return source.Catch<TSource, TException>(
ex =>
{
if (retryCount <= 0)
{
return Observable.Throw<TSource>(ex);
}
return
source.DelaySubscription(retryDelay, scheduler)
.RetryAfterDelay<TSource, TException>(retryDelay, --retryCount, scheduler);
});
}
正在使用的那个:
public static IObservable<T> DelayedRetry<T>(this IObservable<T> src, TimeSpan delay)
{
Contract.Requires(src != null);
Contract.Ensures(Contract.Result<IObservable<T>>() != null);
if (delay == TimeSpan.Zero) return src.Retry();
return src.Catch(Observable.Timer(delay).SelectMany(x => src).Retry());
}
根据马库斯的回答,我写了以下内容:
public static class ObservableExtensions
{
private static IObservable<T> BackOffAndRetry<T>(
this IObservable<T> source,
Func<int, TimeSpan> strategy,
Func<int, Exception, bool> retryOnError,
int attempt)
{
return Observable
.Defer(() =>
{
var delay = attempt == 0 ? TimeSpan.Zero : strategy(attempt);
var s = delay == TimeSpan.Zero ? source : source.DelaySubscription(delay);
return s
.Catch<T, Exception>(e =>
{
if (retryOnError(attempt, e))
{
return source.BackOffAndRetry(strategy, retryOnError, attempt + 1);
}
return Observable.Throw<T>(e);
});
});
}
public static IObservable<T> BackOffAndRetry<T>(
this IObservable<T> source,
Func<int, TimeSpan> strategy,
Func<int, Exception, bool> retryOnError)
{
return source.BackOffAndRetry(strategy, retryOnError, 0);
}
}
我更喜欢它,因为
- 它不会修改
attempts
而是使用递归。 - 它不使用
retries
,但传递尝试retryOnError
的次数
这是我在研究 Rxx 如何做到这一点时想到的另一个稍微不同的实现。因此,它在很大程度上是Rxx方法的精简版本。
签名与马库斯的版本略有不同。您指定要重试的异常类型,延迟策略采用异常和重试计数,因此每次连续重试等可能会有更长的延迟。
我不能保证它是防错的,或者最好的方法,但它似乎有效。
public static IObservable<TSource> RetryWithDelay<TSource, TException>(this IObservable<TSource> source, Func<TException, int, TimeSpan> delayFactory, IScheduler scheduler = null)
where TException : Exception
{
return Observable.Create<TSource>(observer =>
{
scheduler = scheduler ?? Scheduler.CurrentThread;
var disposable = new SerialDisposable();
int retryCount = 0;
var scheduleDisposable = scheduler.Schedule(TimeSpan.Zero,
self =>
{
var subscription = source.Subscribe(
observer.OnNext,
ex =>
{
var typedException = ex as TException;
if (typedException != null)
{
var retryDelay = delayFactory(typedException, ++retryCount);
self(retryDelay);
}
else
{
observer.OnError(ex);
}
},
observer.OnCompleted);
disposable.Disposable = subscription;
});
return new CompositeDisposable(scheduleDisposable, disposable);
});
}
这是我想出的那个。
不想将单个重试的项目连接成一个序列,而是在每次重试时发出整个源序列 - 因此运算符返回一个IObservable<IObservable<TSource>>
。如果不需要,可以简单地将其Switch()
回一个序列。
(背景:在我的用例中,源是一个热序列,我GroupByUntil
出现一个关闭组的项目。如果在两次重试之间丢失了此项,则组永远不会关闭,从而导致内存泄漏。拥有序列序列仅允许对内部序列进行分组(或异常处理或......
/// <summary>
/// Repeats <paramref name="source"/> in individual windows, with <paramref name="interval"/> time in between.
/// </summary>
public static IObservable<IObservable<TSource>> RetryAfter<TSource>(this IObservable<TSource> source, TimeSpan interval, IScheduler scheduler = null)
{
if (scheduler == null) scheduler = Scheduler.Default;
return Observable.Create<IObservable<TSource>>(observer =>
{
return scheduler.Schedule(self =>
{
observer.OnNext(Observable.Create<TSource>(innerObserver =>
{
return source.Subscribe(
innerObserver.OnNext,
ex => { innerObserver.OnError(ex); scheduler.Schedule(interval, self); },
() => { innerObserver.OnCompleted(); scheduler.Schedule(interval, self); });
}));
});
});
}