如何使用Rx以非阻塞方式观察值



我试图在计时器上观察到它的处理程序比间隔长。为了做到这一点,我想在某种线程池、任务池或其他东西上安排观察。

我尝试了线程池、任务池和newthread,但都不起作用。有人知道怎么做吗?示例:

var disposable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100)).ObserveOn(Scheduler.NewThread).
    Subscribe(x =>
      {
      count++;
    Thread.Sleep(TimeSpan.FromMilliseconds(1000));
  });
  Thread.Sleep(TimeSpan.FromSeconds(5));
  disposable.Dispose();
  if (count > 10 )
  {
    //hurray...
  }

您所要求的是一个坏主意,因为您最终会耗尽可用资源(因为创建线程的速率>线程完成速率)。相反,为什么不在上一个项目完成后安排一个新项目呢?

在您的特定示例中,您需要将IScheduler传递给Observable。计时器,而不是尝试使用ObserveOn。

保罗说这是个坏主意,这是对的。从逻辑上讲,您正在创建一种情况,排队的操作可能会占用系统资源。你甚至可以发现它在你的电脑上工作,但在客户的电脑上失败了。可用的内存、32/64位处理器等都可能影响代码。

然而,修改代码以使其执行您想要的操作是很容易的。

不过,首先,只要观察者在下一个调度事件之前完成,Timer方法就会正确地调度计时器事件。如果观察者还没有完成,那么计时器将等待。请记住,可观测定时器是"冷"可观测的,因此对于每个订阅的观测者,实际上都有一个新的可观测定时器。这是一种一对一的关系。

这种行为可以防止计时器无意中破坏您的资源。

因此,正如您当前定义的代码一样,OnNext是每1000毫秒调用一次,而不是每100毫秒调用。

现在,要允许代码以100毫秒的时间表运行,请执行以下操作:

Observable
    .Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100))
    .Select(x => Scheduler.NewThread.Schedule(() =>
    {
        count++;
        Thread.Sleep(TimeSpan.FromMilliseconds(1000));
    }))
    .Subscribe(x => { });

实际上,这段代码是一个IObservable<IDisposable>,其中每个一次性操作都是需要1000毫秒才能完成的计划操作。

在我的测试中,这运行得很好,并且正确地增加了计数。

我确实试着耗尽了我的资源,发现将计时器设置为每毫秒运行一次,我很快就得到了System.OutOfMemoryException,但我发现如果将设置更改为每两毫秒,代码就会运行。然而,当代码运行并创建了大约500个新线程时,这确实占用了超过500 MB的RAM。一点也不好。

小心操作!

如果你真的不断地以比消费更快的速度产生价值,那么正如所指出的,你就会陷入麻烦。如果你不能放慢生产速度,那么你需要考虑如何更快地消耗它们。也许您想要对观察器进行多线程处理以使用多个核心?

如果对观察器进行多线程处理,则可能需要小心处理无序的事件。您将同时处理多个通知,并且关于哪个处理首先完成(或首先进入某种比赛条件关键状态)的所有赌注都已取消。

如果您没有来处理流中的每个事件,请查看浮动的ObserveLatestOn的几个不同实现。这里和这里都有讨论它的线索。

ObserveLatestOn将丢弃除观察者处理先前通知时发生的最新通知之外的所有通知。当观察者完成处理上一个通知时,它将收到最新的通知,并错过其间发生的所有通知。

这样做的好处是,它可以防止生产商比消费者更快地积累压力。如果消费者因为负载而速度较慢,那么处理更多的通知只会让情况变得更糟。删除不需要的通知可以让负载减少到消费者可以跟上的程度。

相关内容

  • 没有找到相关文章