我试图在计时器上观察到它的处理程序比间隔长。为了做到这一点,我想在某种线程池、任务池或其他东西上安排观察。
我尝试了线程池、任务池和newthread,但都不起作用。有人知道怎么做吗?示例:
var disposable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100)).ObserveOn(Scheduler.NewThread).
Subscribe(x =>
{
count++;
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
});
Thread.Sleep(TimeSpan.FromSeconds(5));
disposable.Dispose();
if (count > 10 )
{
//hurray...
}
您所要求的是一个坏主意,因为您最终会耗尽可用资源(因为创建线程的速率>线程完成速率)。相反,为什么不在上一个项目完成后安排一个新项目呢?
在您的特定示例中,您需要将IScheduler传递给Observable。计时器,而不是尝试使用ObserveOn。
然而,修改代码以使其执行您想要的操作是很容易的。
不过,首先,只要观察者在下一个调度事件之前完成,Timer
方法就会正确地调度计时器事件。如果观察者还没有完成,那么计时器将等待。请记住,可观测定时器是"冷"可观测的,因此对于每个订阅的观测者,实际上都有一个新的可观测定时器。这是一种一对一的关系。
这种行为可以防止计时器无意中破坏您的资源。
因此,正如您当前定义的代码一样,OnNext
是每1000毫秒调用一次,而不是每100毫秒调用。
现在,要允许代码以100毫秒的时间表运行,请执行以下操作:
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100))
.Select(x => Scheduler.NewThread.Schedule(() =>
{
count++;
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
}))
.Subscribe(x => { });
实际上,这段代码是一个IObservable<IDisposable>
,其中每个一次性操作都是需要1000毫秒才能完成的计划操作。
在我的测试中,这运行得很好,并且正确地增加了计数。
我确实试着耗尽了我的资源,发现将计时器设置为每毫秒运行一次,我很快就得到了System.OutOfMemoryException
,但我发现如果将设置更改为每两毫秒,代码就会运行。然而,当代码运行并创建了大约500个新线程时,这确实占用了超过500 MB的RAM。一点也不好。
小心操作!
如果你真的不断地以比消费更快的速度产生价值,那么正如所指出的,你就会陷入麻烦。如果你不能放慢生产速度,那么你需要考虑如何更快地消耗它们。也许您想要对观察器进行多线程处理以使用多个核心?
如果对观察器进行多线程处理,则可能需要小心处理无序的事件。您将同时处理多个通知,并且关于哪个处理首先完成(或首先进入某种比赛条件关键状态)的所有赌注都已取消。
如果您没有来处理流中的每个事件,请查看浮动的ObserveLatestOn
的几个不同实现。这里和这里都有讨论它的线索。
ObserveLatestOn
将丢弃除观察者处理先前通知时发生的最新通知之外的所有通知。当观察者完成处理上一个通知时,它将收到最新的通知,并错过其间发生的所有通知。
这样做的好处是,它可以防止生产商比消费者更快地积累压力。如果消费者因为负载而速度较慢,那么处理更多的通知只会让情况变得更糟。删除不需要的通知可以让负载减少到消费者可以跟上的程度。