反应式扩展 操作员触发的次数比我预期的要多



谁能告诉我是什么导致了以下代码中这种看似奇怪的 Do 行为?我希望每个 OnNext 调用一次 Do 处理程序。

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using NUnit.Framework;
[TestFixture]
public class WhyDoesDoActSoWierd
{
[Test]
public void ButWhy()
{
var doCount = 0;
var observable = new Subject<int>();
var stream = observable.Do( x => doCount++ );
var subs =
(from x in stream
where x % 2 == 1
from y in stream
where y % 2 == 0
&& y == x + 1
select new { x, y })
.Subscribe( x => Console.WriteLine( "{0}, {1}", x.x, x.y ) );
observable.OnNext( 1 );
// doCount == 1
observable.OnNext( 2 );
// doCount == 3
observable.OnNext( 3 );
// doCount == 5
observable.OnNext( 4 );
// doCount == 8
observable.OnNext( 5 );
// doCount == 11
observable.OnNext( 6 );
// doCount == 15
Assert.AreEqual( 6, doCount );
}
}

这里的行为是完全正常的。原因是您不仅有一个订阅,还拥有多个订阅。由于查询中两个可观察量之间的笛卡尔积,您拥有的Do数量比预期的要多。

让我们看一下与您的查询不同的(但类似)查询。

var doCountX = 0;
var doCountY = 0;
Action dump = () =>
Console.WriteLine("doCountX = {0}, doCountY = {1}", doCountX, doCountY);
var observable = new Subject<int>();
var streamX = observable.Do(x => doCountX++);
var streamY = observable.Do(x => doCountY++);
var query =
from x in streamX
from y in streamY
select new { x, y };
query.Subscribe(z => Console.WriteLine("{0}, {1}", z.x, z.y));
dump();
for (var i = 1; i <= 6; i++)
{
observable.OnNext(i);
dump();
}

由此产生的输出是:

doCountX = 0, doCountY = 0
doCountX = 1, doCountY = 0
1, 2
doCountX = 2, doCountY = 1
1, 3
2, 3
doCountX = 3, doCountY = 3
1, 4
2, 4
3, 4
doCountX = 4, doCountY = 6
1, 5
2, 5
3, 5
4, 5
doCountX = 5, doCountY = 10
1, 6
2, 6
3, 6
4, 6
5, 6
doCountX = 6, doCountY = 15

这是可以预期的doCountX = 0, doCountY = 0的初始转储,因为此对dump()的调用发生在对OnNext的任何调用之前。

但是,当我们第一次调用OnNext时,我们不会获得查询生成的值,因为第二个可观察streamY尚未订阅。

只有当第二次调用OnNext时,我们才会从查询中获取一个值,该值恰好是第一个与第二个配对的OnNext值。现在,这也创建了一个新的订阅来streamY,等待下一个值。

因此,我们现在从streamX中获得了前两个值,等待序列中的下一个值。因此,当调用OnNext(3)时,我们得到两个结果。

每次发生这种情况时,您都可以看到Do呼叫的数量不断增加doCountY不断增加。

事实上,给定这个非常简单的SelectMany查询,公式为:

doCountY = n * (n - 1) / 2

因此,通过OnNext生成的 6 个值doCountY等于6 * 5 / 215.

使用 10 个值运行可提供10 * 9 / 245值。

因此,SelectMany实际上所做的订阅比您想象的要多得多。这通常是为什么您通常只使用它来链接每个仅产生单个值的可观察量,以防止订阅呈指数级爆炸式增长。

现在有意义吗?

作为Enigmativity丰满的答案之外,这里有两件事:

  1. 枚举这样的可观察量是懒惰地组合的。 就像在通过枚举器之前不会期望评估可枚举查询中的任何组合器一样,您应该期望使用者将看到为该管道评估的所有组合器,其次数与订阅管道的观察者数一样多。 简而言之,x <- stream, y <- stream已经成功了两次。

  2. 理解被重写为:

    stream1.Where(x => x % 2 == 1)
    .SelectMany(x => stream2
    .Where(y => y % 2 == 0 && y == x + 1)
    .Select(y => new { x, y })
    );
    

    对于收到的 x 的每个值,您将订阅与谓词匹配的流的所有值 - 这变得很多。查询推导通常以SelectMany/Join/GroupBy的形式去糖 - 您实际上最终将使用的大多数 Rx 可能更好地用其他运算符来表达 - 例如MergeZip甚至Join

有一个问题与您刚才提出的问题非常相似:反应式扩展是否评估了太多次?

关于为什么这是 Rx 中的预期行为,有一些讨论。

相关内容

  • 没有找到相关文章