非重播热可观察



原始问题

有一个场景,我有多个IObservable序列,我想与Merge组合,然后听。但是,如果其中一个产生错误,我不希望它使其他流的所有内容崩溃,以及重新订阅序列(这是一个"持久"序列)。

我通过在合并之前将Retry()附加到流来做到这一点,即:

IEnumerable<IObservable<int>> observables = GetObservables();
observables
    .Select(o => o.Retry())
    .Merge()
    .Subscribe(/* Do subscription stuff */);

但是,当我想对此进行测试时,问题就出现了。我想测试的是,如果observables中的一个IObservable产生了OnError,那么其他的应该仍然能够通过发送它们的值,并且它们应该得到处理

。我

以为我只用两个Subject<int>代表observables中的两个IObservable;一个发送OnError(new Exception()),另一个,之后,发送OnNext(1)。但是,似乎Subject<int>将重播新订阅的所有先前值(实际上Retry()),将测试变成无限循环。

我试图通过创建一个手动IObservable来解决它,该在第一个订阅和后来的空序列上产生错误,但感觉很黑客:

var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
    i++;
    if (i < nErrors) {
        return Observable.Throw<int>(new Exception()).Subscribe(o);
    } else {
        return Observable.Empty<int>().Subscribe(o);
    }
});

我是否以错误的方式使用Subject或思考Retry()?对此还有其他想法吗?您将如何解决这种情况?

更新

好的,这里有一张大理石图,上面有我想要的,并认为Retry()做了什么。

o = message, X = error.
------o---o---X
               
     Retry() -> ---o---o---X
                             
                   Retry() -> ...

我的问题可能更多在于我没有一个好的股票类来使用前置测试,因为Subject想重播我之前的所有错误。

更新 2

这是一个测试用例,显示了我对重播其值Subject的意思。如果我说它以冷酷的方式做到这一点,我是否正确使用这个词?我知道Subject是一种创建热可观察量的方法,但这种行为对我来说仍然感觉"冷"。

var onNext = false;
var subject = new Subject<int>();
subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);
Assert.That(onNext, Is.True);

根据您更新的需求(您希望重试失败的可观察量,而不仅仅是想要忽略它们),我们可以提出一个有效的解决方案。

首先,请务必了解冷可观察量(在每个订阅上重新创建)和热可观察量(无论订阅如何都存在)之间的区别。您无法Retry()热可观察量,因为它不知道如何重新创建基础事件。也就是说,如果出现可观察到的热门错误,它就会永远消失。

Subject创建一个热可观察量,从某种意义上说,您可以在没有订阅者的情况下调用OnNext,并且它将按预期运行。要将热可观察量转换为冷可观察量,可以使用 Observable.Defer ,它将包含该可观察量的"订阅时创建"逻辑。

综上所述,以下是修改为执行此操作的原始代码:

var success = new Subject<int>();
var error = new Subject<int>();
var observables = new List<IObservable<int>> { Observable.Defer(() => {success = new Subject<int>(); return success.AsObservable();}), 
                                               Observable.Defer(() => {error = new Subject<int>(); return error.AsObservable();}) };                                            
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"));

和测试(类似于之前):

success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
error.OnNext(-1);
success.OnCompleted();
error.OnCompleted();

并且输出如预期的那样:

1
2
-1
done

当然,您需要根据可观察的基础来大幅修改此概念。使用受试者进行测试与实际使用它们不同。

我还想指出,这条评论:

但是,似乎主题将重播所有以前的值 新订阅(实际上是 Retry() ),将测试变成 一个无限循环。

不是真的 - Subject不是这样表现的。代码的其他一些方面会导致无限循环,因为Retry重新创建订阅,并且订阅在某个时候创建错误。


原始答案(用于完成)

问题是Retry()没有做你想让它做的事情。 从这里:

http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx

重复源可观察序列以重试计数时间或直到 它成功终止。

这意味着Retry将不断尝试并重新连接到基础可观察量,直到它成功并且不会引发错误。

我的理解是,您实际上希望忽略可观察量中的异常,而不是重试。这将执行您想要的操作:

observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(/* subscription code */);

这使用 Catch 来捕获具有异常的可观察量,并在该点将其替换为空的可观察量。

以下是使用受试者的完整测试:

var success = new Subject<int>();
var error = new Subject<int>();
var observables = new List<IObservable<int>> { success.AsObservable(), error.AsObservable() };
observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(Observer.Create<int>(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done")));
success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
success.OnCompleted();

正如预期的那样,这会产生:

1
2
done

相关内容

  • 没有找到相关文章

最新更新