是否可枚举<T>。Count() 实际上适用于 IObservable<T>?



有没有一个例子告诉我Observable.Count<TSource> Method是如何工作的?我想出的示例似乎返回了一个包装在可观察量中的计数,而不是预期的计数。

例如,我希望从中返回1

System.Diagnostics.Debug.WriteLine((Observable.Return<string>("Hello world!")).Count());

将来会返回1吗(因为毕竟它是一个异步序列)?还是我错过了一些基本的东西?在撰写本文时,我实际上假设.Count()将返回T的结果,并且只要结果被推出,就会随着时间的推移而增长。真?是的。

Rx 中的聚合运算符的工作方式与 LINQ 中的略有不同 - 它们不会立即返回值,而是返回未来结果(即,我们只能在可观察量完成后知道最终计数是多少)。

所以如果你写:

Observable.Return("foo").Count().Subscribe(x => Console.WriteLine(x));
>>> 1

因为,毕竟它是一个异步序列

这实际上并不完全正确。在这里,一切都会立即运行,只要有人打电话Subscribe.上面的这段代码没有任何异步,没有额外的线程,一切都发生在订阅上。

我认为

使用立即返回的可观察量并使用 async/await 语法(如 rasx 在注释中所做的那样)会让事情变得过于混乱。

让我们创建一个包含 5 个元素的流,这些元素每秒返回一个,然后完成:

private IObservable<long> StreamWith5Elements()
{
    return Observable.Interval(TimeSpan.FromSeconds(1))
                     .Take(5);
}

我们可以使用 async/await 魔法调用它,如这个 LINQPad 友好示例所示:

void Main()
{
    CountExampleAsync().Wait();
}
private async Task CountExampleAsync()
{
    int result = await StreamWith5Elements().Count();
    Console.WriteLine(result);
}

但它误导了这里发生的事情 - Count()返回一个IObservable<int>,但 Rx 对await非常友好,并将该结果流转换为Task<int> - 然后等待交回该任务的int结果。

当你对一个IObservable<T>使用 await 时,你隐含地说你希望该可观察对象用单个结果调用OnNext(),然后调用OnComplete()。实际发生的情况是,您将获得一个返回流终止前发送的最后一个值的Task<T>。(类似于AsyncSubject<T>的行为方式)。

这很有用,因为这意味着任何流都可以映射到Task,但它确实需要仔细考虑。

现在,上面的例子相当于以下更传统的Rx:

void Main()
{
    PlainRxCountExample();
}
private void PlainRxCountExample()
{
    IObservable<int> countResult = StreamWith5Elements().Count();
    countResult.Subscribe(count => Console.WriteLine(count));
    /* block until completed for the sake of the example */
    countResult.Wait();
}

在这里你可以看到Count()确实在返回一个 int 流 - 以提供异步计数。仅当源流完成时,它才会返回。

在 Rx 的早期,Count() 实际上是同步的。

然而,这并不是一个非常有用的状态,因为它"退出Monad" - 即将您带出IObservable<T>并阻止您使用Rx运算符进行进一步组合。

一旦你开始"在流中思考",Count() 的异步性质真的非常直观,因为当然你只能在流完成后提供流的计数 - 为什么要为此徘徊?? :)

相关内容

  • 没有找到相关文章

最新更新