IO可无限循环产生结果



这是我到目前为止开发的代码:

var observable = Observable.Create<string>(async observer =>
{
    var wc = new WebClient { UseDefaultCredentials = true };
    observer.OnNext(await wc.DownloadStringTaskAsync("http://ya.ru"));
});
observable.Subscribe(
    res => Debug.WriteLine("got result: {0}", res), 
    exc => Debug.WriteLine("exception: {0}", exc.Message)
); 

这可以正确获取网站数据并触发我的回调一次。我想要的是一个无限循环,其行为如下:等待结果 -> 调用OnNext ->等待 n 秒 ->重复操作。

创建一个Observable.Interval并将其SelectMany到我的Observable中并不完全可行,因为这将在固定的时间段内查询网站。我希望仅在上一个调用成功或失败后触发下一个调用。实现这一目标的最优雅、最简洁的方法是什么?

在不更改太多代码的情况下,您可以在产量之后有效地连接延迟,然后无限期地重复整个事情。

var observable = Observable.Create<string>(async observer =>
{
    var wc = new WebClient { UseDefaultCredentials = true };
    observer.OnNext(await wc.DownloadStringTaskAsync("http://ya.ru"));
});
observable
    .Concat(Observable.Empty<string>().Delay(TimeSpan.FromSeconds(1)))
    .Repeat()
    .Subscribe(
      res => Debug.WriteLine("got result: {0}", res), 
      exc => Debug.WriteLine("exception: {0}", exc.Message)
    ); 

但是,有更好的方法来执行此操作,因为在以前的实例中,您每秒都会创建一个新的 WebClient。相反,你可以做这样的事情...

using System.Reactive.Threading.Tasks;

var observable = Observable.Using(() => new WebClient(), (client) =>
    client.DownloadStringTaskAsync("http://ya.ru")
        .ToObservable()
        .Concat(Observable.Empty<string>().Delay(TimeSpan.FromSeconds(1)))
        .Repeat()
        );

如果您想重复错误,可以添加Retry...

var observable = Observable.Using(() => new WebClient(), (client) =>
    client.DownloadStringTaskAsync("http://ya.ru")
        .ToObservable()
        .Retry(3)
        .Concat(Observable.Empty<string>().Delay(TimeSpan.FromSeconds(1)))
        .Repeat()
        );

相关内容

  • 没有找到相关文章

最新更新