为什么不建议在.net响应式扩展中使用主题



我目前正在掌握。net的响应式扩展框架,我正在努力通过我找到的各种介绍资源(主要是http://www.introtorx.com)

我们的应用程序涉及许多检测网络帧的硬件接口,这些将是我的IObservables,然后我有各种组件将消耗这些帧或对数据执行某种转换方式并产生新类型的帧。例如,还会有其他组件需要显示每n帧。我确信Rx将对我们的应用程序很有用,但是我正在为IObserver接口的实现细节而挣扎。

我读过的大多数(如果不是全部的话)资源都说我不应该自己实现IObservable接口,而应该使用提供的函数或类之一。从我的研究看来,创建一个Subject<IBaseFrame>将为我提供我所需要的,我将有我的单线程从硬件接口读取数据,然后调用我的Subject<IBaseFrame>实例的OnNext函数。然后,不同的IObserver组件将接收来自该Subject的通知。

我的困惑来自于本教程附录中给出的建议,其中说:

避免使用主题类型。Rx实际上是一个函数式编程范例。使用主体意味着我们现在正在管理状态,而状态可能会发生变异。同时处理状态变化和异步编程是非常困难的。此外,许多操作符(扩展方法)都经过精心编写,以确保订阅和序列的生命周期保持正确和一致;当你引入主题时,你可以打破这个。如果显式地使用主题,将来的版本也可能会出现明显的性能下降。

我的应用程序对性能非常关键,在它进入生产代码之前,我显然要测试使用Rx模式的性能;然而,我担心我正在做一些违背Rx框架精神的事情,因为我使用了Subject类,并且框架的未来版本将会损害性能。

有没有更好的方法来做我想做的事?无论是否有观察者,硬件轮询线程都将持续运行(否则HW缓冲区将备份),因此这是一个非常热门的序列。然后我需要将接收到的帧传递给多个观察者。

如有任何建议,我将不胜感激。

好的,如果我们忽略了我的教条式的方法,忽略了"主题是好还是坏"。让我们看一下问题空间。

我敢打赌,你要么有两种需要融入的系统风格中的一种。

  1. 当消息到达时,系统引发事件或回调
  2. 您需要轮询系统,看看是否有任何消息需要处理

对于选项1,很简单,我们只需用适当的FromEvent方法包装它,就完成了。去酒吧!

对于选项2,我们现在需要考虑如何轮询以及如何有效地进行轮询。当我们获得值时,我们如何发布它?

我可以想象您需要一个专门的线程来进行轮询。你不会希望其他程序员敲打ThreadPool/TaskPool,让你陷入ThreadPool饥饿的境地。或者你不想要上下文切换的麻烦(我猜)。假设我们有自己的线程,我们可能会有某种While/Sleep循环,我们坐在其中进行轮询。当检查发现一些消息时,我们发布它们。所有这些听起来都很适合Observable.Create。现在我们可能不能使用While循环,因为它不允许我们返回一个Disposable来允许取消。幸运的是,您已经阅读了整本书,因此对递归调度非常熟悉!

我想这样的东西可以工作。# NotTested

public class MessageListener
{
    private readonly IObservable<IMessage> _messages;
    private readonly IScheduler _scheduler;
    public MessageListener()
    {
        _scheduler = new EventLoopScheduler();
        var messages = ListenToMessages()
                                    .SubscribeOn(_scheduler)
                                    .Publish();
        _messages = messages;
        messages.Connect();
    }
    public IObservable<IMessage> Messages
    {
        get {return _messages;}
    }
    private IObservable<IMessage> ListenToMessages()
    {
        return Observable.Create<IMessage>(o=>
        {
                return _scheduler.Schedule(recurse=>
                {
                    try
                    {           
                        var messages = GetMessages();
                        foreach (var msg in messages)
                        {
                            o.OnNext(msg);
                        }   
                        recurse();
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }                   
                });
        });
    }
    private IEnumerable<IMessage> GetMessages()
    {
         //Do some work here that gets messages from a queue, 
         // file system, database or other system that cant push 
         // new data at us.
         // 
         //This may return an empty result when no new data is found.
    }
}

我真的不喜欢主题的原因是,这通常是开发人员对问题没有真正明确设计的情况。插入一个主题,到处戳戳,然后让可怜的支持开发人员猜测WTF发生了什么。当你使用Create/Generate等方法时,你正在对序列进行局部化。你可以在一种方法中看到这一切,而且你知道没有其他方法会产生令人讨厌的副作用。如果我看到一个主题字段,我现在必须去寻找它正在使用的类中的所有地方。如果某个MFer公开了一个,那么所有的赌注都结束了,谁知道这个序列是如何被使用的!异步/并发/Rx很难。您不需要让副作用和因果关系编程使您更加头晕目眩,从而使其变得更加困难。

一般情况下你应该避免使用Subject,但是对于你在这里做的事情,我认为它们工作得很好。当我在Rx教程中遇到"避免主题"信息时,我问了一个类似的问题。

引用Dave Sexton (of Rxx)的话

" subject是Rx的有状态组件。它们在什么时候有用你需要创建一个类似事件的可观察对象,作为一个字段或局部变量。"

我倾向于使用它们作为进入Rx的入口点。因此,如果我有一些代码需要说"发生了什么事"(就像您一样),我将使用Subject并调用OnNext。然后将其公开为IObservable供其他人订阅(您可以在主题上使用AsObservable()以确保没有人可以强制转换为主题并弄乱事情)。

你也可以用。net事件和使用FromEventPattern来实现这一点,但是如果我只打算把事件变成IObservable,我不认为用事件代替Subject有什么好处(这可能意味着我在这里遗漏了一些东西)

然而,你应该强烈避免使用Subject订阅IObservable,即不要将Subject传递给IObservable.Subscribe方法。

通常当你在管理一个主题时,你实际上只是在重新实现Rx中已经存在的功能,并且可能不是那么健壮,简单和可扩展的方式。

当您尝试将一些异步数据流调整到Rx中时(或者从当前非异步的数据流创建异步数据流),最常见的情况通常是:

  • 数据的来源是一个事件:正如Lee所说,这是最简单的情况:使用FromEvent并前往pub

  • 数据源来自同步操作,您希望轮询更新(例如web服务或数据库调用):在这种情况下,您可以使用Lee建议的方法,或者对于简单的情况,您可以使用Observable.Interval.Select(_ => <db fetch>)之类的东西。你可能想使用DistinctUntilChanged()来防止在源数据没有任何变化时发布更新。

  • 数据的来源是某种异步api,它调用你的回调:在这种情况下,使用Observable。创建连接你的回调调用OnNext/OnError/OnComplete的观察者

  • 数据的来源是一个调用阻塞,直到新的数据可用(例如一些同步套接字读取操作):在这种情况下,你可以使用Observable。创建用于包装从套接字读取并发布到Observer的命令式代码。读取数据时OnNext。

使用可观察到的。创建与创建一个管理Subject的类相当等同于使用yield关键字与创建一个实现IEnumerator的整个类。当然,您可以编写一个IEnumerator,使其与yield代码一样干净和良好,但是哪一个封装得更好,设计更简洁呢?Observable也是如此。Create vs managing Subjects.

可观测。Create为惰性设置和干净的拆卸提供了一个干净的模式。如何使用包装Subject的类来实现这一点?你需要某种Start方法…你怎么知道什么时候该结束?还是你总是开始说话,即使没人在听?当您完成后,如何让它停止从套接字读取/轮询数据库等?你必须有某种Stop方法,而且你不仅要能访问你订阅的IObservable,还要能访问最初创建Subject的那个类。

可观测。创建,它都被包裹在一个地方。Observable的主体。除非有人订阅,否则不会运行Create,因此如果没有人订阅,则永远不会使用资源。和可观察到的。Create返回一个Disposable,它可以干净地关闭你的资源/回调等——当观察者取消订阅时调用。你用来生成Observable的资源的生命周期与Observable本身的生命周期紧密相连。

引用的块文本很好地解释了为什么你不应该使用Subject<T>,但是为了更简单,你正在组合观察者和可观察对象的功能,同时在两者之间注入某种状态(无论你是封装还是扩展)。

这就是你遇到麻烦的地方;这些责任应该是分开的,彼此不同的。

也就是说,在您特定的情况下,我建议您将关注点分成更小的部分。

首先,您的线程是热的,并且始终监视硬件的信号以引发通知。你通常会怎么做?事件。我们就从这个开始吧

让我们定义事件将触发的EventArgs

// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
{
    public BaseFrameEventArgs(IBaseFrame baseFrame)
    {
        // Validate parameters.
        if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");
        // Set values.
        BaseFrame = baseFrame;
    }
    // Poor man's immutability.
    public IBaseFrame BaseFrame { get; private set; }
}

现在,将触发事件的类。注意,这可以是一个静态类(因为总是有一个线程在监视硬件缓冲区),也可以是一个按需调用的类,它订阅。你必须适当地修改它。

public class BaseFrameMonitor
{
    // You want to make this access thread safe
    public event EventHandler<BaseFrameEventArgs> HardwareEvent;
    public BaseFrameMonitor()
    {
        // Create/subscribe to your thread that
        // drains hardware signals.
    }
}

现在你有了一个公开事件的类。可观察对象可以很好地处理事件。如果您遵循标准的事件模式,通过Observable类上的静态FromEventPattern方法,可以将事件流(将事件流视为事件的多个触发)转换为IObservable<T>实现,这是一流的支持。

有了事件源和FromEventPattern方法,我们可以很容易地创建IObservable<EventPattern<BaseFrameEventArgs>> (EventPattern<TEventArgs>类体现了你在。net事件中看到的东西,特别是从EventArgs派生的实例和代表发送方的对象),如下所示:
// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();
// Create the observable.  It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
    FromEventPattern<BaseFrameEventArgs>(
        h => source.HardwareEvent += h,
        h => source.HardwareEvent -= h);

当然,您想要一个IObservable<IBaseFrame>,但这很容易,使用Observable类上的Select扩展方法来创建一个投影(就像您在LINQ中所做的那样,我们可以将所有这些都包装在一个易于使用的方法中):

public IObservable<IBaseFrame> CreateHardwareObservable()
{
    // The event source.
    // Or you might not need this if your class is static and exposes
    // the event as a static event.
    var source = new BaseFrameMonitor();
    // Create the observable.  It's going to be hot
    // as the events are hot.
    IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
        FromEventPattern<BaseFrameEventArgs>(
            h => source.HardwareEvent += h,
            h => source.HardwareEvent -= h);
    // Return the observable, but projected.
    return observable.Select(i => i.EventArgs.BaseFrame);
}

概括地说subject不适合用于公共接口是不好的。虽然这当然不是响应式编程方法应该有的样子,但对于经典代码来说,这绝对是一个很好的改进/重构选项。

如果你有一个带有公共set访问器的普通属性,并且你想要通知有关更改,那么用一个BehaviorSubject替换它是没有问题的。INPC或其他额外的活动就不那么干净了,这让我个人感到厌烦。为此,您可以并且应该使用BehaviorSubjects作为公共属性,而不是普通属性,并放弃INPC或其他事件。

另外,Subject-interface使你的接口的用户更了解你的属性的功能,更有可能订阅,而不仅仅是获取值。

如果你想让其他人收听/订阅某个属性的更改,那么最好使用

相关内容

  • 没有找到相关文章

最新更新