使用响应式扩展轮询web服务,并绑定最后x个结果



我正在尝试使用响应式扩展(Rx)的任务,它似乎是一个很好的适合,轮询在一个特定的间隔一个web服务,并显示它的最后x个结果。

我有一个web服务,它向我发送我想要监视的仪器的状态。我想以特定的速率轮询此仪器,并在列表中显示最近已轮询的20个状态。

所以我的列表就像一个服务结果的"移动窗口"。

我正在开发一个WPF应用程序与Caliburn。微,但我不认为这是非常相关的。

我设法得到直到现在是以下内容(只是一个示例应用程序,我很快被黑客攻击,我不打算在真正的应用程序中的ShellViewModel中这样做):

public class ShellViewModel : Caliburn.Micro.PropertyChangedBase, IShell
{
    private ObservableCollection<string> times;
    private string currentTime;
    public ShellViewModel()
    {
        times = new ObservableCollection<string>();
        Observable
            .Interval(TimeSpan.FromSeconds(1))
            .SelectMany(x => this.GetCurrentDate().ToObservable())
            .ObserveOnDispatcher()
            .Subscribe(x =>
            {
                this.CurrentTime = x;
                this.times.Add(x);
            });
    }
    public IEnumerable<string> Times
    {
        get
        {
            return this.times;
        }
    }
    public string CurrentTime
    {
        get
        {
            return this.currentTime;
        }
        set
        {
            this.currentTime = value;
            this.NotifyOfPropertyChange(() => this.CurrentTime);
        }
    }
    private async Task<string> GetCurrentDate()
    {
        var client = new RestClient("http://www.timeapi.org");
        var request = new RestRequest("/utc/now.json");
        var response = await client.ExecuteGetTaskAsync(request);
        return response.Content;
    }
}

在视图中,我只有一个绑定到CurrentTime属性的标签和一个绑定到Times属性的列表。

我的问题是:

  • 它不限于列表中的20个项目,因为我总是将项目添加到ObservableCollection,但我找不到更好的方法来数据绑定
  • 间隔不像我想的那样工作。如果查询运行时间超过1秒,则两个查询将并行运行,我不希望发生这种情况。我的目标是使查询无限重复,但以每秒不超过1个查询的速度重复。如果一个查询的结束时间超过1秒,它应该等待它完成并直接触发新的查询。

第二个编辑:

之前的编辑是我的愚蠢和非常困惑,它连续触发事件,因为间隔是连续的,永远不会结束。Brandon的解决方案是正确的,并按预期工作。

编辑:

基于Brandon的例子,我尝试在LinqPad中执行以下代码:

Observable
    .Merge(Observable.Interval(TimeSpan.FromSeconds(2)), Observable.Interval(TimeSpan.FromSeconds(10)))
    .Repeat()
    .Scan(new List<double>(), (list, item) => { list.Add(item); return list; })
    .Subscribe(x => Console.Out.WriteLine(x))

我可以看到对控制台的写入每2秒发生一次,而不是每10秒发生一次。所以Repeat在重复之前不会等待两个Observable都完成。

试试这个:

// timer that completes after 1 second
var intervalTimer = Observable
    .Empty<string>()
    .Delay(TimeSpan.FromSeconds(1));
// queries one time whenever subscribed
var query = Observable.FromAsync(GetCurrentDate);
// query + interval timer which completes
// only after both the query and the timer
// have expired
var intervalQuery = Observable.Merge(query, intervalTimer);
// Re-issue the query whenever intervalQuery completes
var queryLoop = intervalQuery.Repeat();
// Keep the 20 most recent results
// Note.  Use an immutable list for this
// https://www.nuget.org/packages/microsoft.bcl.immutable
// otherwise you will have problems with
// the list changing while an observer
// is still observing it.
var recentResults = queryLoop.Scan(
    ImmutableList.Create<string>(), // starts off empty
    (acc, item) =>
    {
        acc = acc.Add(item);
        if (acc.Count > 20)
        {
            acc = acc.RemoveAt(0);
        }
        return acc;
    });
// store the results
recentResults
    .ObserveOnDispatcher()
    .Subscribe(items =>
    {
        this.CurrentTime = items[0];
        this.RecentItems = items;
    });

当GetCurrentDate正在进行时,应该跳过间隔消息。

Observable
    .Interval(TimeSpan.FromSeconds(1))
    .GroupByUntil(p => 1,p => GetCurrentDate().ToObservable().Do(x => {
                            this.CurrentTime = x;
                            this.times.Add(x);
                        }))
    .SelectMany(p => p.LastAsync())
    .Subscribe();

相关内容

  • 没有找到相关文章