使用反应式扩展并行调用 2 个 httpwebrequests



我正在尝试并行调用 2 个 httpwebrequest,并使它们在使用 Rx 扩展时调用相同的回调。但我不知道我该如何实现这一点。这是我的代码:

    private static IObservable<Stream> GetImage(string path)
    {
        var uri = new Uri(path);
        var thumburi = new Uri(path + "_thumb.jpg");
        return Observable.Create<Stream>(o =>
                                                  {
                                                        var request = (HttpWebRequest) HttpWebRequest.Create(uri);
                                                        var readComplete =
                                                          Observable.FromAsyncPattern<WebResponse>(
                                                              request.BeginGetResponse,
                                                              request.EndGetResponse)();
                                                        var subscription = readComplete
                                                          .Select(e => e.GetResponseStream())
                                                          .Subscribe(o);

                                                      return subscription;
                                                  });
    }

使用最新的位和 .Net 4.5,您可以执行以下操作:

private static IObservable<byte[]> GetImages(string path)
{
    var sources = new Uri[]
    {
        var uri = new Uri(path),
        var thumburi = new Uri(path + "_thumb.jpg")
    };
    var obs = from uri in sources.ToObservable()
              from data in Observable.Using(
                   () => new WebClient(),
                   client => client.DownloadDataTaskAsync(uri).ToObservable())
              select data;
    return obs;
}

我确实想知道您是否真的只想返回大量数据,而不关心哪个流对应于基础,哪个是缩略图。并行发出请求后,您将无法再控制它们返回的顺序。如果需要,可以投影包含 uri 和数据流的类型,以消除它们的歧义。

猜你会把异步调用拉出到两个单独的流,然后连接它们,不是吗?像这样: http://leecampbell.blogspot.com/2010/06/rx-part-5-combining-multiple.html

我会建议这种解决方案。

使GetImage更通用的用途:

private static IObservable<Stream> GetImage(Uri uri)
{
    return Observable.Create<Stream>(o =>
    {
        var request = (HttpWebRequest)HttpWebRequest.Create(uri);
        var readComplete =
            Observable.FromAsyncPattern<WebResponse>(
                request.BeginGetResponse,
                request.EndGetResponse)();
        var subscription =
            readComplete
                .Select(e => e.GetResponseStream())
                .Subscribe(o);
        return subscription;
    });
}

然后添加一个特定的GetImages方法,用于查询图像及其缩略图:

private static IObservable<Tuple<Uri, Stream>> GetImages(string path)
{
    var uris = new []
    {
        new Uri(path + ".jpg"),
        new Uri(path + "_thumb.jpg"),
    }.ToObservable();
    return
        from uri in uris
        from stream in GetImage(uri)
        select Tuple.Create(uri, stream);
}

我假设您的path变量不能包含".jpg"扩展名,否则您将不得不进行一些字符串操作。

现在GetImages返回一个IObservable<Tuple<Uri, Stream>>,因为SelectMany不保证流的返回顺序,因此我们需要使用 Uri 来消除流的歧义。

让我知道这是否适合您。

为什么不直接使用 Zip?

GetStream("foo.jpg").Zip(GetStream("bar.jpg"), (foo, bar) => new { foo, bar })
    .Subscribe(fooAndBar => ...);

相关内容

  • 没有找到相关文章