学术练习,我是反应式扩展的新手,并试图了解这项技术。我给自己设定了一个目标,即制作一个返回 Pi 连续数字的 IObservable(由于不相关的原因,我碰巧现在对 Pi 非常感兴趣)。反应式扩展包含用于制作可观察量的运算符,他们给出的指导是您应该"几乎永远不需要创建自己的 IObsevable"。但是我看不出如何使用现成的运算符和方法做到这一点。让我再阐明一下。
我计划使用一种算法,该算法将涉及对Arctan的泰勒级数的扩展。为了获得 Pi 的下一个数字,我将在该系列中再扩展几个术语。
所以我需要异步进行系列扩展,偶尔将下一个计算数字扔给 IObserver。我显然不想为每个新数字从头开始计算。
有没有办法使用 RX 的内置运算符实现此行为,还是我必须从头开始编写 IObservable?什么策略暗示自己?
对于这样的事情,最简单的方法是使用主题。主题既是IObservable又是IObserver,这听起来有点奇怪,但它允许您像这样使用它们:
class PiCalculator
{
private readonly Subject<int> resultStream = new Subject<int>();
public IObservable<int> ResultStream
{
get { return resultStream; }
}
public void Start()
{
// Whatever the algorithm actually is
for (int i = 0; i < 1000; i++)
{
resultStream.OnNext(i);
}
}
}
因此,在您的算法中,您只需在想要生成下一个值时调用主题上的OnNext
。
然后要使用它,您只需要以下内容:
var piCalculator = new PiCalculator();
piCalculator.ResultStream.Subscribe(n => Console.WriteLine((n)));
piCalculator.Start();
最简单的方法是创建一个Enumerable
,然后转换它:
IEnumerable<int> Pi()
{
// algorithm here
for (int i = 0; i < 1000; i++)
{
yield return i;
}
}
用法(对于冷可观察对象,即每个新的"订阅"都从头开始创建 Pi):
var cold = Pi().ToObservable(Scheduler.ThreadPool);
cold.Take(5).Subscribe(Console.WriteLine);
如果你想让它hot
(每个人都共享相同的底层计算),你可以这样做:
var hot = cold.Publish().RefCount();
这将在第一个订阅者之后开始计算,并在他们全部断开连接时停止计算。这是一个简单的测试:
hot.Subscribe(p => Console.WriteLine("hot1: " + p));
Thread.Sleep(5);
hot.Subscribe(p => Console.WriteLine("hot2: " + p));
这应该只显示hot1
打印一会儿,然后hot2
短暂等待后加入,但打印相同的数字。如果这是用cold
完成的,两个订阅将分别从0开始。