如何合并两个可观察量并提前完成



内置Merge运算符的行为是在两个源都完成时完成。我正在寻找此运算符的变体,该运算符生成一个可观察量,当两个源可观察量中的任何一个完成时完成。例如,如果第一个可观察量成功完成,然后第二个可观察量完成并出现异常,我希望忽略此异常。

我想出了一个实现,它将一个特殊的哨兵异常连接到两个枚举,然后合并的序列捕获并抑制此异常。我想知道我是否错过了一个更简单的解决方案。

/// <summary>
/// Merges elements from two observable sequences into a single observable sequence,
/// that completes as soon as any of the source observable sequences completes.
/// </summary>
public static IObservable<T> MergeUntilAnyCompletes<T>(this IObservable<T> first,
IObservable<T> second)
{
var sentinel = new Exception();
first = first.Concat(Observable.Throw<T>(sentinel));
second = second.Concat(Observable.Throw<T>(sentinel));
// Concat: Concatenates the second observable sequence to the first
// observable sequence upon successful termination of the first.
return first.Merge(second)
.Catch(handler: (Exception ex) =>
ex == sentinel ? Observable.Empty<T>() : Observable.Throw<T>(ex));
// Catch: Continues an observable sequence that is terminated by an exception
// of the specified type with the observable sequence produced by the handler.
}

有趣的技巧:

public static IObservable<T> MergeUntilAnyCompletes<T>(this IObservable<T> first,
IObservable<T> second)
{
return Observable.Merge(
first.Materialize(),
second.Materialize()
).Dematerialize();
}

Materialize将可观察量转换为通知的可观察量,因此Merge将不再禁止显示OnCompleted通知。当您Dematerialize时,该运算符将看到OnCompleted并停止。

旁注:如果你想要一些关于Materialize/Dematerialize的有趣,稍微学术的阅读,请阅读这篇博文。他写的是Ix,但同样的事情也适用于Rx。

相关内容

  • 没有找到相关文章

最新更新