压缩或合并两个IObservable序列,其中一个序列可能失败



我有一个要求,可以看到两个I可观察序列的合并,但其中一个序列可能会失败,而不会影响观察者。

因此,以Rx简介中的数字和字符序列为例:

nums  --0--1--2|
chars --a--b--c--d--e--f|
result -0--1--2|
        a  b  c|
Prints:
0 & a
1 & b
2 & c

如果不允许其中一个序列失败(抛出异常),但不停止订阅的观察程序的执行,您将如何进行此操作?

nums  --0--1--2--3|
chars --a--b--X--d--e--f|
result -0--1--2--3|
        a  b  X  d|
Prints:
0 & a
1 & b
2 & NO VALUE
3 & d

我对Catch、Finally、OnErrorResumeText等以及如何将其应用于我的用例有点困惑。非常感谢所有的帮助。

请记住Rx语法。序列定义如下:

OnNext* (OnError | OnCompleted)

换句话说,一个序列有零个或多个OnNext事件,后面跟着一个OnError或一个OnCompleted。(这忽略了时间方面,因为它可能是一个无限长的流,所以从未完成的流仍然有效)。

关键是一个流最多只能有一个错误。一旦发送了OnError,就不能再发送任何事件。

现在,可以用OnErrorResumeText之类的东西来包装一个性能不好的源流,但你必须有一个新的流来恢复包装。。。第一股水流死了。

通常,在这种情况下,你会有一些潜在的热点事件源,你希望能够恢复"直播"(即,你不想从第一个事件开始)。

设置示例

我将模拟这个场景。不幸的是,会有相当多的设置,但实际上创建这些可恢复流通常需要一些工作!

我将首先创建一个工厂函数,它可以订阅每秒发出一次的精心设计的热字母源。这将被调用以"恢复"对基础数据的订阅。

首先,我们可以通过用字母数组压缩计时器来创建一个无错误的字母流:

var letters = new [] { "a", "b", "c", "d", "e", "f" };
var letterStream = Observable.Interval(TimeSpan.FromSeconds(1))
                             .Zip(letters, (_, ls) => ls);

现在,我们将Publish这一点,使其成为热门——对已发布流的订阅将从任何地方获取:

var hotLetterStream = letterStream.Publish();

现在,我们可以创建一个可观察对象,它将在订阅时订阅直播流,如果它看到字母"c",则会失败。这有点棘手,但这里不要太担心——这不是示例的要点,我们只需要一个流,它可以为我们提供底层的热门数据,并在特定值上失败。它展示了可观测流的特性,即它们只能出错一次。

var failStream = hotLetterStream.SelectMany(x =>  x == "c"
        ? Observable.Throw<string>(new Exception()) 
        : Observable.Return(x));  

现在我们可以设置数字流了——它只会在4秒内每秒返回一个基于零的值:

var numberStream = Observable.Interval(TimeSpan.FromSeconds(1)).Take(4);

现在,我们可以将流与Zip结合起来——我们使用Catch将失败的字母替换为"无值"单值流,然后使用Repeat无缝连接到热点源的全新订阅:

var combinedStream = numberStream.Zip(
    failStream.Catch<string, Exception>(ex => Observable.Return("NO VALUE"))
              .Repeat(), (ns,ls) => ns + " & " + ls);

现在我们可以订阅这个:

combinedStream.Subscribe(Console.WriteLine);

最后,我们必须用Connect"打开"已发布的流,以启动值流动:

hotLetterStream.Connect();

如果您引入nuget包rx-main,则此代码将按编写的方式运行,并产生以下输出:

0 & a
1 & b
2 & NO VALUE
3 & d

通信错误

现在,在这个琐碎的例子中,我们通过用"NO VALUE"字符串替换字母来传达错误。这对这个例子来说很好,可能对你有用。然而,在现实中,处理这样一个失败的流可能会导致对代码进行混乱的检查。

幸运的是,有一个干净的解决方案。您想要使用Error monad的概念。这在像Haskell这样的语言中得到了本机支持,但这个想法可以很容易地在Rx中被采用。它的工作方式是提供一个特殊的容器——很像Nullable<T>在.NET中的工作方式——但它不是保存值或null,而是保存值或异常。

它可以被认为是更通用的Either<TLeft, TRight> monad的专门化,它有左侧和右侧。这是由Dave Sexton在他对Rx的出色Rxx扩展中实现的。链接直接指向对Either的讨论。创建自己的版本也很容易。

因此,与其订阅IObservable<T>,不如将您的T封装在IObservable<Either<TException, T>>中。如果值是好的,则使用Either.Right发送它-如果它是坏的,则用Either.Left转发异常(这是一般约定-例如,好的"右"坏的"左")。您甚至可以创建一个Error类型来包装Either并将TLeft限制为一个异常。关心值的运算符可以检查左属性和右属性,这些运算符不会只是传递值而忽略它是否是错误,就像.NET函数可以使用Nullable<T>而不必关心值是否为null一样。

通过这种方式,您可以以一种清晰易懂的方式将异常沿着漫长的可观察链传播到最终订阅者。

这里的另一个要点是,区分您可能想要通信的数据中的错误和导致系统崩溃的流管道中的错误通常很重要。不要急于使用OnError来传递一段糟糕的数据,因为这会扼杀流。相反,可以考虑直接发送一个错误值。

此示例的完整代码要点如下:https://gist.github.com/james-world/904ca7383a8f1cd349b9

相关内容

  • 没有找到相关文章

最新更新