请原谅我的无知,但我在反应式扩展方面有一点经验。我想要的是这样的东西:
myObservable.Do(d => myObserver.OnNext(d)).Interval(someTime).subscribe({ Run the this code periodically with period T=sometime});
口头上,给定一个一直发出项目的可观察量,需要的是将这些项目连续馈送到观察器中,并且每个时间段T
调用subscribe
内的函数。只是提到这个函数调用myObserver
中的一些函数。上面的代码片段不起作用。它说myObservable
不包含Interval
的定义。您能否对情况进行一些说明?
编辑:观察者处理传入的项目。因此,我想要的是一直听,并每隔T
调用一个启动和停止过程。这个过程必须收集物品并存储它们。有一些开始和停止memorystream
。因此,在第一次(StartWith(-1L)
),观察者启动一些处理,并在T
秒后停止该过程。此启动和停止循环会定期继续。
编辑#2:
我有一个发出物品的IObservable<T>
。这些物品由观察员接收。此观察器可以处理数据。我想要的是定期启动和停止此过程。这意味着观察者应该一直接收数据,并在一段时间内接收数据t
,如果停止,则该过程必须启动,如果已经启动/运行,则必须停止。这个start()
stop()
函数是观察者的成员,即 observer.start() 和 observer.stop()。此外,如果进程正在运行,观察器内部还有一个标志/布尔值,该标志/布尔值为真或假。我认为Interval
应该工作,但不适用于IObservable<T>
.您能否对情况进行一些说明?
编辑#3我想我找到了我需要的东西,这要归功于讨论。
observable.subscribe(oberver) // observer always listening
TimeSpan someTime = TimeSpan.FromMinutes(1.0);
IObservable<long> myObservable = Observable.Interval(someTime);
myObservable.Subscribe(d =>
{
if (!observer.isProcessing)
{
observer.Start();
}else if (observer.isProcessing)
{
observer.Stop();
}
});
Interval
是直接脱离Observable
的静态方法 - 它不是IObservable<T>
接口的一部分。你会像这样使用它:
TimeSpan someTime = TimeSpan.FromMinutes(1.0);
IObservable<long> myObservable = Observable.Interval(someTime);
现在你可以继续编写你想要的代码:
myObservable
.Do(d => myObserver.OnNext(d))
.Subscribe(d =>
{
//Run the this code periodically with period T=sometime
});
我不知道你为什么这样称呼myObserver.OnNext(d)
。这似乎有点像代码气味。如果您可以为您的问题添加更多详细信息,那么也许我也可以帮助您解决该部分。