无法使用 FirstAsync 获取 Rx 窗口的第一个窗口



我想把一个可观察序列(IObservable)分成几个可观察序列(标准很复杂,但为了演示,我们可以简单地使用count)。很明显,这需要Window

    private static IObservable<int> GenerateSequence()
    {
        return Observable.Range(1, 5);
    }
    await GenerateSequence()
            .Window(2)
            .Select((w, i) => new {i, w})
            .Do(w => w.w.Dump($"Window {w.i}"));

输出如预期:

Window 0-->1
Window 0-->2
Window 0-->X
Window 1-->3
Window 1-->4
Window 1-->X
Window 2-->5
Window 2-->X

(X标记OnCompleted)

现在,出于某种原因,我只想要这些序列中的第一个。因此,FirstAsync:

    (await GenerateSequence()
        .Window(2)
        .FirstAsync())
        .Dump("Window");

但奇怪的是,我根本没有得到任何输出,好像我从FirstAsync得到的序列完全死了。

我对Rx有点陌生,所以我完全不知道这里到底发生了什么和为什么。

EDIT:你的答案有效。您可能想要将Window替换为Buffer。它们实际上是相同的,除了Buffer是用于类似的场景。它返回一个列表而不是数组:

var t = GenerateSequence()
    .Buffer(2)
    .FirstAsync()
    //.Select(list => list.ToArray()) //If you're particular about Task<int[]> over Task<IList<int>>
    .ToTask();

另外,.SelectMany(i => i)可以用.Merge()代替。

旧答案:


我一般不把Rx和await混在一起。它显然是被支持的,但似乎…不直观。以下代码转储第一个窗口的输出:
GenerateSequence()
    .Window(2)
    .FirstAsync()
    .Subscribe(i => i.Dump("Window"));

从每个窗口转储第一项:

GenerateSequence()
    .Window(2)
    .Select(o => o.FirstAsync())
    .Subscribe(i => i.Dump("Window"));

从你的问题看不清楚你想要哪一个

实际上,解决方案非常简单(尽管我仍然不明白在我的原始代码中发生了什么)-使用SelectMany来平化序列:

GenerateSequence()
.Window(2)
.FirstAsync()
.Concat()
.Dump("SO");

按预期产生:

SO-->1
SO-->2
SO-->X

然后,为了实现我最初的目标-即,改变monad并返回结果为Task<int[]>:

GenerateSequence()
.Window(2)
.FirstAsync()
.Concat()
.ToArray()
.ToTask();

相关内容

  • 没有找到相关文章

最新更新