我正在尝试将IEnumerable
变成一个IObservable
,以相隔一秒的块交付其项目。
var spartans = Enumerable.Range(0, 300).ToObservable();
spartans
.Window(30)
.Zip(Observable.Timer(DateTimeOffset.Now, TimeSpan.FromMilliseconds(1000)), (x, _) => x)
.SelectMany(w => w)
.Subscribe(
n => Console.WriteLine("{0}", n),
() => Console.WriteLine("all end"));
使用此代码,唯一打印的内容是在十秒后"全部结束"。如果我删除.Zip
则整个序列会立即打印,如果我删除.Window
并.SelectMany
则整个序列每秒打印一个项目。如果我偷看传递给SelectMany
的lambda内部可观察到的"窗口",我可以看到它是空的。我的问题是,为什么?
出现问题是因为Window
如何使用计数 - 而这个不是特别直观!
如您所知,Window
提供流。但是,对于计数,子流是"暖的"——即,当此流的观察者在其OnNext
处理程序中收到一个新窗口时,它必须在将控制权交还给可观察对象之前订阅它,否则事件将丢失。
Zip
并不"知道"它正在处理这种情况,并且不会让您有机会在获取下一个子窗口之前订阅每个子窗口。
如果删除Zip
,则会看到所有事件,因为SelectMany
在接收子窗口时会订阅所有子窗口。
最简单的解决方法是使用 Buffer
而不是 Window
- 进行一次更改,您的代码就可以工作了。那是因为Buffer
的工作方式与SelectMany
非常相似,通过这样做有效地保留了窗口:
Window(30).SelectMany(x => x.ToList())
这些元素不再是暖窗口,而是结晶为列表,您的Zip
现在将按预期工作,以下SelectMany
将列表展平。
重要的性能注意事项
请务必注意,此方法将导致一次性运行整个IEnumerable<T>
。如果应该延迟计算源枚举(这通常是可取的(,则需要采用不同的方式。使用下游可观察对象来控制上游可观察对象的步伐是棘手的。
让我们用帮助程序方法替换你的枚举对象,以便我们可以看到每批 30 个被评估的时间:
static IEnumerable<int> Spartans()
{
for(int i = 0; i < 300; i++)
{
if(i % 30 == 0)
Console.WriteLine("30 More!");
yield return i;
}
}
并像这样使用它(此处使用Buffer
"修复",但行为与Window
相似(:
Spartans().ToObservable()
.Buffer(30)
.Zip(Observable.Timer(DateTimeOffset.Now,
TimeSpan.FromMilliseconds(1000)),
(x, _) => x)
.SelectMany(w => w)
.Subscribe(
n => Console.WriteLine("{0}", n),
() => Console.WriteLine("all end"));
然后,您会看到这种输出,演示如何一次性耗尽可枚举的源:
30 More!
0
1
...miss a few...
29
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30
31
32
...etc...
要真正调整源的节奏,而不是直接使用ToObservable()
您可以执行以下操作。请注意,Spartans()
IEnumerable<T>
上的Buffer
操作来自 nuget 包Ix-Main
- 由 Rx 团队添加以堵塞IEnumerable<T>
monad 上的几个孔:
var spartans = Spartans().Buffer(30);
var pace = Observable.Timer(DateTimeOffset.Now, TimeSpan.FromMilliseconds(1000));
pace.Zip(spartans, (_,x) => x)
.SelectMany(x => x)
.Subscribe(
n => Console.WriteLine("{0}", n),
() => Console.WriteLine("all end"));
并且输出成为可能更理想的延迟计算输出:
30 More!
0
1
2
...miss a few...
29
30 More!
30
31
32
...miss a few...
59
30 More!
60
61
62
...etc
我不确定如何使其与 Windows 一起工作,但是这个呢:
var spartans = Enumerable.Range(0, 300).ToObservable();
spartans
.Select(x => Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => x))
.Merge(30);