Rx如何从发布/子模式创建序列



我正在尝试使用Rx从发布/子模式(即由生产者发布下一个元素的经典观察者模式)创建序列进行评估。这与.net事件基本相同,只是我们需要将其概括为不需要有事件,所以我不能利用Observable.FromEvent。我玩过Observable.Create和Observable。Generate和发现自己最终不得不写代码来处理pub/sub(即,我必须写生产者/消费者代码来存储发布的项目,然后通过调用IObserver.OnNext()来消费它),所以看起来我并没有真正利用Rx。。。

我是在寻找正确的路径,还是这很适合Rx?

感谢

您的发布者只是将一些IObservables公开为属性。而您的消费者只需向他们发送Subscribe(或在订阅前进行他们想要的任何Rx-fu)。

有时,这就像在发布服务器中使用Subjects一样简单。有时它更复杂,因为你的出版商实际上正在观察其他一些可观察的过程。

这里有一个愚蠢的例子:

public class Publisher
{
    private readonly Subject<Foo> _topic1;
    /// <summary>Observe Foo values on this topic</summary>
    public IObservable<Foo> FooTopic
    {
       get { return _topic1.AsObservable(); }
    }
    private readonly IObservable<long> _topic2;
    /// <summary>Observe the current time whenever our clock ticks</summary>
    public IObservable<DateTime> ClockTickTopic
    {
        get { return _topic2.Select(t => DateTime.Now); }
    }
    public Publisher()
    {
         _topic1 = new Subject<Foo>();
         // tick once each second
         _topic2 = Observable.Interval(TimeSpan.FromSeconds(1));
    }
    /// <summary>Let everyone know about the new Foo</summary>
    public NewFoo(Foo foo) { _topic1.OnNext(foo); }
}

// interested code...
Publisher p = ...;
p.FooTopic.Subscribe(foo => ...);
p.ClickTickTopic.Subscribe(currentTime => ...);
// count how many foos occur during each clock tick
p.FooTopic.Buffer(p.ClockTickTopic)
    .Subscribe(foos => Console.WriteLine("{0} foos during this interval", foos.Count));

使用RX绝对非常适合pub/sub。这里有一个演示,演示了使用IObservable和RX的最简单的pub/sub模式。

使用NuGet将反应式扩展(RX)添加到您的项目中,搜索rx-main并安装Reactive Extensions - Main Library

using System;
using System.Reactive.Subjects;
namespace RX_2
{
    public static class Program
    {
        static void Main(string[] args)
        {
            Subject<int> stream = new Subject<int>();
            stream.Subscribe(
                o =>
                {
                    Console.Write(o);
                });
            stream.Subscribe(
                o =>
                {
                    Console.Write(o);
                });
            for (int i = 0; i < 5; i++)
            {
                stream.OnNext(i);
            }
            Console.ReadKey();
        }
    }
}

执行时,代码输出如下:

0011223344

相关内容

  • 没有找到相关文章

最新更新