我正在从TCP端点读取数据流。这可能会出错,或者出错,此时我想尝试连接到次要端点。
我想使用Observable.重复创建一个无限的端点列表,然后选择其中一个来创建流。如果出现错误,我想切换到下一个。我该怎么办?
问题在于在重新订阅时保持使用源的状态。可能有比使用IEnumerator
更好的替代方案,但可能不使用Retry
语义。
//sample source which throws an error after 5 entries
Func<string, IObservable<string>> sampleSource =
endpoint =>
Observable
.Interval(TimeSpan.FromSeconds(0.5))
.Select(i => $"{endpoint} : {i + 1}")
.Take(5)
.Concat(Observable.Throw<string>(new Exception()));
//infinite sequence of sources
var endpoints = new string[] { "source1", "source2" }.Repeat();
var sequence =
Observable.Using
(
endpoints.GetEnumerator,
enumerator => Observable.Create<string>(observer =>
{
enumerator.MoveNext();
observer.OnNext(enumerator.Current);
return Disposable.Empty;
})
.SelectMany(sampleSource)
.Retry()
);
sequence.Subscribe(c => Console.WriteLine(c));
Console.ReadLine();
输出:
source1 : 1
source1 : 2
source1 : 3
source1 : 4
source1 : 5
source2 : 1
source2 : 2
source2 : 3
source2 : 4
source2 : 5
source1 : 1
source1 : 2
一些示例代码会很有帮助,这样回答者就不必编造内容并添加糟糕的电影引用。。。
void Main()
{
var initialObservable = new Subject<int>();
var observableToContinueWithAfterError = Observable.Return(10);
var chainedObservable = initialObservable.Catch(observableToContinueWithAfterError);
chainedObservable.Subscribe(
i => Console.WriteLine(i),
e => Console.WriteLine("Too bad AC ain't in charge no more (this won't happen, Observable.Catch swallows the exception)."),
() => Console.WriteLine("Keep the change you..."));
initialObservable.OnNext(1);
initialObservable.OnNext(2);
initialObservable.OnError(new Exception());
}
有关Observable.Catch
的更多信息,请参阅此处。