C#中Reactive数据轮询的有效设置



我需要实现对来自远程组件的一些数据的定期轮询
它应该在上次轮询完成后的下次轮询数据+等待,比如说5秒
我想出了下面的代码,但后来发现应用程序在关闭时抛出异常-事件查看器有一个0xc00000fd的记录,这是堆栈溢出异常
值得一提的是,只有当应用程序打开并轮询数据很长一段时间时才会发生此异常(堆栈溢出需要时间)
所有这些都是一个WPF应用程序,下面的代码在ViewModel中。

我知道为什么这个代码中会发生异常(可能不应该在Subscribe中调用OnNext),但正确的实现方式是什么?

_ctrlSubj = new Subject<ControllerInfo>();
_ctrlSubj.SelectMany(async _ => 
{
// CurrentController is of type ControllerInfo
// next line can take various amount of time
var jobDetails = await Library.GetJobsAsync(CurrentController);
return jobDetails;
})
.ObserveOnDispatcher()
.Subscribe(async e =>
{
// Jobs is bound to View
Jobs = new ObservableCollection<JobDetail>(jobDetails);
await Task.Delay(TimeSpan.FromSeconds(5));
_ctrlSubj.OnNext(CurrentController);
});

第二次编辑

@Aron的回答让我想起了多次订阅的问题,下面的回答就是这样。我推荐一个助手函数IntervalAsync,如下所示:

public static class RxExtensions
{
public static IObservable<TResult> IntervalAsync<TResult>(Func<IObservable<TResult>> f, TimeSpan period)
{
return IntervalAsync(f, period, Scheduler.Default);
}
public static IObservable<TResult> IntervalAsync<TResult>(Func<IObservable<TResult>> f, TimeSpan period, IScheduler scheduler)
{
return Observable.Create<TResult>(o =>
{
var q = new BehaviorSubject<TimeSpan>(TimeSpan.Zero);
var observable = q
.Delay(t => Observable.Timer(t))
.SelectMany(_ => f())
.Do(t => q.OnNext(period));
return observable.Subscribe(o);
});
}
}

最后的代码看起来像这样:

var subscription = RxExtensions.IntervalAsync(
() => Observable.FromAsync(() => Library.GetJobsAsync(CurrentController)), 
TimeSpan.FromSeconds(5)
)
.ObserveOnDispatcher()
.Subscribe(i =>
{
Jobs = new ObservableCollection<JobDetail>(jobDetails);
});

@阿伦的答案是有效的。我并不觉得它更简单,因为你有更多的Rx-TPL混合,尽管我承认"更简单"在旁观者眼中。


首次编辑:(不推荐,存在多个订阅错误)。

您的担忧是有效的:对于同步反应管道,Interval将等待整个管道完成。但是有了异步管道,Interval就站不住了。因此,如果第一个异步任务耗时4.5秒,那么下一个任务将在第一个任务完成后0.5秒启动。

如果你想让结束到开始的延迟是一个固定的时间跨度,我认为最好采用排队机制,类似于你设置的机制。我会做类似的事情:

var q = new BehaviorSubject<TimeSpan>(TimeSpan.Zero);
var subscription = q
.Delay(t => Observable.Timer(t))
.SelectMany(_ => Observable.FromAsync(() => Library.GetJobsAsync(CurrentController))
.Subscribe(i =>
{
Jobs = new ObservableCollection<JobDetail>(jobDetails);
});

Rx的线程&我认为堆栈管理在这里比TPL更有效,而且这不会导致无限堆栈。然而,我还没有测试过。


原始答案:

这可以做到,但我不能测试,因为没有类型。

var subscription = Observable.Interval(TimeSpan.FromSeconds(5))
.SelectMany(_ => Observable.FromAsync(() => Library.GetJobsAsync(CurrentController))
.ObserveOnDispatcher()
.Subscribe(jobDetails =>
{
Jobs = new ObservableCollection<JobDetail>(jobDetails);
});

如果不好,那么请修改您的答案以包含mcve。


测试代码:

这是我用来测试异步TPL/RX混合的代码。没有完全复制@IgorStack的环境,因为没有WPF(没有.ObserveOnDispatcher()):

var f = new Func<Task<int>>(async () => {
await Task.Delay(TimeSpan.FromSeconds(1));
return 3;
});
var scheduler = new EventLoopScheduler(); //or Scheduler.Default
var o1 = RxExtensions.IntervalAsync(() => Observable.FromAsync(() => f()), TimeSpan.FromSeconds(5), scheduler)
.Timestamp();
var subscription1 = o1.Subscribe(i =>
{
Console.WriteLine("s1: " + i.Timestamp.ToString("hh:mm:ss.ffff"));
});
var subscription2 = o1.Subscribe(i =>
{
Console.WriteLine("s2: " + i.Timestamp.ToString("hh:mm:ss.ffff"));
});

对不起@Shlomo,实现这一点的最简单方法如下(这将使您的端到端延迟5秒)。

var jobs = Observable.Create<List<Job>>(async (observer, cancel) => {
while(cancel.IsCancellationRequested == false)
{
try
{
var ret = await Library.GetJobsAsync(CurrentController);
observer.OnNext(ret);
await Task.Delay(5000, cancel);
}
catch(Exception ex)
{
observer.OnError(ex);
return;
}
}
observer.OnCompleted();
});

如果你想开始延迟5秒启动,你可以简单地用交换代码体

var delay = Task.Delay(5000, cancel);
var ret = await Library.GetJobsAsync(CurrentController);
observer.OnNext(ret);
await delay;

以下是重写为LinqPad"UnitTest"的代码。。。您可以确认结果。

void Main()
{
var scheduler = new TestScheduler();
var foo = Observable.Create<int>(async (observer, cancellationToken) => {
while(!cancellationToken.IsCancellationRequested){
var ret = await DoStuff(scheduler);
observer.OnNext(ret);
await Observable.Delay(
Observable.Return(Unit.Default), 
TimeSpan.FromSeconds(5), 
scheduler)
.ToTask();
}
});
using(foo.Timestamp(scheduler).Subscribe(f => Console.WriteLine(f.Timestamp))){
scheduler.AdvanceBy(TimeSpan.FromSeconds(120).Ticks);
}
}
// Define other methods and classes here

public Task<int> DoStuff(IScheduler scheduler){
return Observable.Delay(Observable.Return(1), TimeSpan.FromSeconds(1), scheduler).ToTask();
}

输出如下:

01/01/0001 00:00:01 +00:00
01/01/0001 00:00:07 +00:00
01/01/0001 00:00:13 +00:00
01/01/0001 00:00:19 +00:00
01/01/0001 00:00:25 +00:00
01/01/0001 00:00:31 +00:00
01/01/0001 00:00:37 +00:00
01/01/0001 00:00:43 +00:00
01/01/0001 00:00:49 +00:00
01/01/0001 00:00:55 +00:00
01/01/0001 00:01:01 +00:00
01/01/0001 00:01:07 +00:00
01/01/0001 00:01:13 +00:00
01/01/0001 00:01:19 +00:00
01/01/0001 00:01:25 +00:00
01/01/0001 00:01:31 +00:00
01/01/0001 00:01:37 +00:00
01/01/0001 00:01:43 +00:00
01/01/0001 00:01:49 +00:00
01/01/0001 00:01:55 +00:00

最新更新