实现一个 IObservable 来计算 Pi 的数字


这是一项

学术练习,我是反应式扩展的新手,并试图了解这项技术。我给自己设定了一个目标,即制作一个返回 Pi 连续数字的 IObservable(由于不相关的原因,我碰巧现在对 Pi 非常感兴趣)。反应式扩展包含用于制作可观察量的运算符,他们给出的指导是您应该"几乎永远不需要创建自己的 IObsevable"。但是我看不出如何使用现成的运算符和方法做到这一点。让我再阐明一下。

我计划使用一种算法,该算法将涉及对Arctan的泰勒级数的扩展。为了获得 Pi 的下一个数字,我将在该系列中再扩展几个术语。

所以我需要异步进行系列扩展,偶尔将下一个计算数字扔给 IObserver。我显然不想为每个新数字从头开始计算。

有没有办法使用 RX 的内置运算符实现此行为,还是我必须从头开始编写 IObservable?什么策略暗示自己?

对于这样的事情,最简单的方法是使用主题。主题既是IObservable又是IObserver,这听起来有点奇怪,但它允许您像这样使用它们:

class PiCalculator
{
    private readonly Subject<int> resultStream = new Subject<int>();
    public IObservable<int> ResultStream
    {
        get { return resultStream; }
    }
    public void Start()
    {
        // Whatever the algorithm actually is
        for (int i = 0; i < 1000; i++)
        {
            resultStream.OnNext(i);
        }
    }
}

因此,在您的算法中,您只需在想要生成下一个值时调用主题上的OnNext

然后要使用它,您只需要以下内容:

var piCalculator = new PiCalculator();
piCalculator.ResultStream.Subscribe(n => Console.WriteLine((n)));
piCalculator.Start();

最简单的方法是创建一个Enumerable,然后转换它:

IEnumerable<int> Pi()
{
    // algorithm here
    for (int i = 0; i < 1000; i++)
    {
        yield return i;
    }
}

用法(对于冷可观察对象,即每个新的"订阅"都从头开始创建 Pi):

var cold = Pi().ToObservable(Scheduler.ThreadPool);
cold.Take(5).Subscribe(Console.WriteLine);

如果你想让它hot(每个人都共享相同的底层计算),你可以这样做:

var hot = cold.Publish().RefCount();

这将在第一个订阅者之后开始计算,并在他们全部断开连接时停止计算。这是一个简单的测试:

hot.Subscribe(p => Console.WriteLine("hot1: " + p));
Thread.Sleep(5);    
hot.Subscribe(p => Console.WriteLine("hot2: " + p));    

这应该只显示hot1打印一会儿,然后hot2短暂等待后加入,但打印相同的数字。如果这是用cold完成的,两个订阅将分别从0开始。

相关内容

  • 没有找到相关文章

最新更新