Web客户端出现Rx argumentException



我正在使用Reactive Extensions,以便在Windows Phone上使用WebClient轻松下载网页。当我运行以下代码时,我在Subscribe调用中得到一个ArgumentExceptin。

参数名称:{0}

参数名称:型

public IObservable<DownloadStringCompletedEventArgs> StartRequest(string uri)
    {
        var _browser = new WebClient();
        var obs = Observable.FromEvent<DownloadStringCompletedEventHandler,
            DownloadStringCompletedEventArgs>(
                h => _browser.DownloadStringCompleted += h,
                h => _browser.DownloadStringCompleted -= h)
            .Where(e => !e.Cancelled)
            .Retry(3)
            .SelectMany(
                e => (e.Error == null && !string.IsNullOrEmpty(e.Result))
                         ? Observable.Return(e)
                         : Observable.Throw<DownloadStringCompletedEventArgs>(e.Error))
            .Take(1);

        _browser.DownloadStringAsync(new Uri(uri));
        return obs;
    }
 var obs = StartRequest("http://www.google.com");
        obs.Subscribe(
            x => Console.WriteLine(x.Result)
            );

您的问题是使用的是FromEvent扩展,而不是FromEventPattern扩展。

前者用于非标准事件,后者用于遵循"sender/eventargs"模式的事件。

也就是说,你的可观察性还有一些问题。

  • WebClient的实例永远不会被丢弃。你应该当然会发生这种情况,但因为你使用的是可观察的,所以你需要使用CCD_ 4扩展方法。

  • Where放在Take(1)之前可能意味着永远不会结束。

  • Retry也不会为你做正确的事情。如果"DownloadStringCompleted"observable出现错误,然后重试将重新附加到事件,但该事件永远不会返回新的值,因为您不再调用DownloadStringAsync

你想对这种可观察对象做的是,每次新的观察对象订阅时,让它重新执行web调用,但是,当订阅被共享时(例如,如果可观察对象被发布(,你不希望重新执行web呼叫。你有点想吃蛋糕,也想吃。

以下是操作方法:

public IObservable<string> CreateDownloadStringObservable(string uri)
{
    return Observable.Create<string>(o =>
    {
        var result = new ReplaySubject<string>();
        var inner = Observable.Using(() => new WebClient(), wc =>
        {
            var obs = Observable
                .FromEventPattern<
                    DownloadStringCompletedEventHandler,
                    DownloadStringCompletedEventArgs>(
                        h => wc.DownloadStringCompleted += h,
                        h => wc.DownloadStringCompleted -= h)
                .Take(1);
            wc.DownloadStringAsync(new Uri(uri));
            return obs;
        }).Subscribe(ep =>
        {
            if (ep.EventArgs.Cancelled)
            {
                result.OnCompleted();
            }
            else
            {
                if (ep.EventArgs.Error != null)
                {
                    result.OnError(ep.EventArgs.Error);
                }
                else
                {
                    result.OnNext(ep.EventArgs.Result);
                    result.OnCompleted();
                }
            }
        }, ex =>
        {
            result.OnError(ex);
        });
        return new CompositeDisposable(inner, result.Subscribe(o));
    });
}

现在你可以这样称呼它:

IObservable<string> obs =
    CreateDownloadStringObservable("http://www.google.com")
        .Retry(3);

请注意,Retry现在位于它应该属于的请求方法之外(除非您在方法调用中添加retry参数(。

您不能将Retry与FromEvent一起使用,因为FromEvent是一个热点可观察的结果-您只会得到3次相同的结果。修复现有代码的最简单方法是通过Observable。延时:

string uri = "http://www.whatever.com";
var webClient = Observable.Defer(() => {
    var browser = new WebClient();
    var ret =  Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
        h => _browser.DownloadStringCompleted += h,
        h => _browser.DownloadStringCompleted -= h);
    browser.DownloadStringAsync(new Uri(uri));
    return ret;
});
webClient
    .Timeout(TimeSpan.FromSeconds(15))
    .Retry(3);

Defer意味着不会立即创建WebClient,而是为所有订阅((到Observable的人创建它,这是Retry所需要的。

相关内容

  • 没有找到相关文章

最新更新