Observable.Window 和 .Zip不像我期望的那样运行



我正在尝试将IEnumerable变成一个IObservable,以相隔一秒的块交付其项目。

var spartans = Enumerable.Range(0, 300).ToObservable();
spartans
    .Window(30)
    .Zip(Observable.Timer(DateTimeOffset.Now, TimeSpan.FromMilliseconds(1000)), (x, _) => x)
    .SelectMany(w => w)
    .Subscribe(
        n => Console.WriteLine("{0}", n),
        () => Console.WriteLine("all end"));

使用此代码,唯一打印的内容是在十秒后"全部结束"。如果我删除.Zip则整个序列会立即打印,如果我删除.Window.SelectMany则整个序列每秒打印一个项目。如果我偷看传递给SelectMany的lambda内部可观察到的"窗口",我可以看到它是空的。我的问题是,为什么?

出现问题是因为Window如何使用计数 - 而这个不是特别直观!

如您所知,Window提供流。但是,对于计数,子流是"暖的"——即,当此流的观察者在其OnNext处理程序中收到一个新窗口时,它必须在将控制权交还给可观察对象之前订阅它,否则事件将丢失。

Zip 并不"知道"它正在处理这种情况,并且不会让您有机会在获取下一个子窗口之前订阅每个子窗口。

如果删除Zip ,则会看到所有事件,因为SelectMany在接收子窗口时订阅所有子窗口。

最简单的解决方法是使用 Buffer 而不是 Window - 进行一次更改,您的代码就可以工作了。那是因为Buffer的工作方式与SelectMany非常相似,通过这样做有效地保留了窗口:

Window(30).SelectMany(x => x.ToList())

这些元素不再是暖窗口,而是结晶为列表,您的Zip现在将按预期工作,以下SelectMany将列表展平。

重要的性能注意事项

请务必注意,此方法将导致一次性运行整个IEnumerable<T>。如果应该延迟计算源枚举(这通常是可取的(,则需要采用不同的方式。使用下游可观察对象来控制上游可观察对象的步伐是棘手的。

让我们用帮助程序方法替换你的枚举对象,以便我们可以看到每批 30 个被评估的时间:

static IEnumerable<int> Spartans()
{
    for(int i = 0; i < 300; i++)
    {
        if(i % 30 == 0)
            Console.WriteLine("30 More!");
        
        yield return i;            
    }
}

并像这样使用它(此处使用Buffer"修复",但行为与Window相似(:

Spartans().ToObservable()
          .Buffer(30)
          .Zip(Observable.Timer(DateTimeOffset.Now, 
                                TimeSpan.FromMilliseconds(1000)),
               (x, _) => x)
          .SelectMany(w => w)
          .Subscribe(
              n => Console.WriteLine("{0}", n),
              () => Console.WriteLine("all end")); 

然后,您会看到这种输出,演示如何一次性耗尽可枚举的源:

30 More!
0
1
...miss a few...
29
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30 More!
30
31
32
...etc...

要真正调整源的节奏,而不是直接使用ToObservable()您可以执行以下操作。请注意,Spartans() IEnumerable<T>上的Buffer操作来自 nuget 包Ix-Main - 由 Rx 团队添加以堵塞IEnumerable<T> monad 上的几个孔:

var spartans = Spartans().Buffer(30);
var pace = Observable.Timer(DateTimeOffset.Now, TimeSpan.FromMilliseconds(1000));
       
pace.Zip(spartans, (_,x) => x)
    .SelectMany(x => x)
    .Subscribe(
        n => Console.WriteLine("{0}", n),
        () => Console.WriteLine("all end"));  

并且输出成为可能更理想的延迟计算输出:

30 More!
0
1
2
...miss a few...
29
30 More!
30
31
32
...miss a few...
59
30 More!
60
61
62
...etc

我不确定如何使其与 Windows 一起工作,但是这个呢:

var spartans = Enumerable.Range(0, 300).ToObservable();
spartans
    .Select(x => Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => x))
    .Merge(30);

相关内容

  • 没有找到相关文章

最新更新