我搜索了一个重复项,但没有找到。我有一个嵌套的可观察IObservable<IObservable<T>>
,我想把它展平为一个IObservable<T>
。我不想使用Concat
运算符,因为它会延迟对每个内部可观察量的订阅,直到完成上一个可观察量。这是一个问题,因为内部可观察量是冷的,我希望它们在外部可观察量发出后立即开始发出T
值。我也不想使用Merge
运算符,因为它弄乱了发出值的顺序。下面的大理石图显示了Merge
运算符的问题(就我的情况而言)行为,以及理想的合并行为。
Stream of observables: +----1------2-----3----|
Observable-1 : +--A-----------------B-------|
Observable-2 : +---C---------------------D------|
Observable-3 : +--E--------------------F-------|
Merge (undesirable) : +-------A-------C----E----B-----------D---F-------|
Desirable merging : +-------A-----------------B-------C---D------EF---|
可观察量-1 发出的所有值都应在可观察量-2 发出的任何值之前。Observable-2 和 Observable-3 也应该如此,依此类推。
我喜欢Merge
运算符的地方是它允许配置内部可观察量的最大并发订阅数。我想使用我尝试实现的自定义MergeOrdered
运算符保留此功能。这是我的在建方法:
public static IObservable<T> MergeOrdered<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency = Int32.MaxValue)
{
return source.Merge(maximumConcurrency); // How to make it ordered?
}
下面是一个使用示例:
var source = Observable
.Interval(TimeSpan.FromMilliseconds(300))
.Take(4)
.Select(x => Observable
.Interval(TimeSpan.FromMilliseconds(200))
.Select(y => $"{x + 1}-{(char)(65 + y)}")
.Take(3));
var results = await source.MergeOrdered(2).ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");
输出(不理想):
Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C
理想的输出是:
Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-C
澄清:关于值的顺序,值本身无关紧要。重要的是它们起源的内序列的顺序,以及它们在该序列中的位置。应首先发出第一个内部序列中的所有值(按其原始顺序),然后发出第二个内部序列中的所有值,然后发出第三个内部序列的所有值,依此类推。
这个可观察量无法知道任何内部可观察量的最后一个值是否是应该产生的第一个值。
例如,您可以有以下内容:
Stream of observables: +--1---2---3--|
Observable-1 : +------------B--------A-|
Observable-2 : +--C--------D-|
Observable-3 : +-E--------F-|
Desirable merging : +------------------------ABCDEF|
在这种情况下,我会这样做:
IObservable<char> query =
sources
.ToObservable()
.Merge()
.ToArray()
.SelectMany(xs => xs.OrderBy(x => x));
我通过使用Merge
、Merge(1)
¹和Replay
运算符的组合,找到了这个问题的解决方案。Merge
运算符强制执行并发策略,Merge(1)
运算符强制执行有序的顺序发射。为了防止Merge
弄乱发出值的顺序,引入了内部序列的额外包装。每个内部序列都投影到一个IObservable<IObservable<T>>
,该立即发出内部序列,并在内部序列完成时完成。此包装是使用Observable.Create
方法实现的:
public static IObservable<T> MergeOrdered<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency = Int32.MaxValue)
{
return source.Select(inner => inner.Replay(buffered => Observable
.Create<IObservable<T>>(observer =>
{
observer.OnNext(buffered);
return buffered.Subscribe(_ => { }, observer.OnError, observer.OnCompleted);
})))
.Merge(maximumConcurrency)
.Merge(1);
}
Replay
运算符缓冲内部序列发出的所有消息,以便在Merge
订阅和Merge(1)
订阅之间的同时不会丢失这些消息。
有趣的是,由于包装,创建了一个中间IObservable<IObservable<IObservable<T>>>
序列。然后,这个可怕的东西被打开两次,第一次是Merge
,第二次是Merge(1)
操作员。
这不是一个完全有效的解决方案,因为没有理由缓冲Merge(1)
当前订阅的内部序列。不过,优化这种低效率并非微不足道,所以我将保持原样。在每个子序列包含少量元素的情况下,此缺陷的影响应该可以忽略不计。在这些情况下,尝试修复它甚至可能导致弊大于利。
¹理想情况下,我想使用Concat
而不是等效但效率较低的Merge(1)
运算符。不幸的是,Concat
运算符在当前版本的 Rx 库 (5.0.0) 中表现得很奇怪。在相当复杂的查询中使用Concat
时,我什至遇到了死锁行为,通过切换到Merge(1)
运算符解决了这个问题。
注意:此答案的原始实现,具有用于控制并发而不是Merge
运算符的SemaphoreSlim
,可以在第 1 次修订中找到。基于Merge
的实现应该更好,因为它不涉及即发即弃的任务延续,并且对内部序列的订阅同步发生,而不是卸载到ThreadPool
。