在收听的同时定期调用订阅中的函数



请原谅我的无知,但我在反应式扩展方面有一点经验。我想要的是这样的东西:

myObservable.Do(d => myObserver.OnNext(d)).Interval(someTime).subscribe({ Run the this code periodically with period T=sometime});

口头上,给定一个一直发出项目的可观察量,需要的是将这些项目连续馈送到观察器中,并且每个时间段T调用subscribe内的函数。只是提到这个函数调用myObserver中的一些函数。上面的代码片段不起作用。它说myObservable不包含Interval的定义。您能否对情况进行一些说明?

编辑:观察者处理传入的项目。因此,我想要的是一直听,并每隔T调用一个启动和停止过程。这个过程必须收集物品并存储它们。有一些开始和停止memorystream。因此,在第一次(StartWith(-1L)),观察者启动一些处理,并在T秒后停止该过程。此启动和停止循环会定期继续。


编辑#2:

我有一个发出物品的IObservable<T>。这些物品由观察员接收。此观察器可以处理数据。我想要的是定期启动和停止此过程。这意味着观察者应该一直接收数据,并在一段时间内接收数据t,如果停止,则该过程必须启动,如果已经启动/运行,则必须停止。这个start()stop()函数是观察者的成员,即 observer.start() 和 observer.stop()。此外,如果进程正在运行,观察器内部还有一个标志/布尔值,该标志/布尔值为真或假。我认为Interval应该工作,但不适用于IObservable<T>.您能否对情况进行一些说明?

编辑#3我想我找到了我需要的东西,这要归功于讨论。

observable.subscribe(oberver) // observer always listening
TimeSpan someTime = TimeSpan.FromMinutes(1.0);
IObservable<long> myObservable = Observable.Interval(someTime);
myObservable.Subscribe(d =>
{
if (!observer.isProcessing)
{
observer.Start();
}else if (observer.isProcessing)
{
observer.Stop();
}
});

Interval是直接脱离Observable的静态方法 - 它不是IObservable<T>接口的一部分。你会像这样使用它:

TimeSpan someTime = TimeSpan.FromMinutes(1.0);
IObservable<long> myObservable = Observable.Interval(someTime);

现在你可以继续编写你想要的代码:

myObservable
.Do(d => myObserver.OnNext(d))
.Subscribe(d =>
{
//Run the this code periodically with period T=sometime
});

我不知道你为什么这样称呼myObserver.OnNext(d)。这似乎有点像代码气味。如果您可以为您的问题添加更多详细信息,那么也许我也可以帮助您解决该部分。

相关内容

  • 没有找到相关文章

最新更新