我正在尝试并行调用 2 个 httpwebrequest,并使它们在使用 Rx 扩展时调用相同的回调。但我不知道我该如何实现这一点。这是我的代码:
private static IObservable<Stream> GetImage(string path)
{
var uri = new Uri(path);
var thumburi = new Uri(path + "_thumb.jpg");
return Observable.Create<Stream>(o =>
{
var request = (HttpWebRequest) HttpWebRequest.Create(uri);
var readComplete =
Observable.FromAsyncPattern<WebResponse>(
request.BeginGetResponse,
request.EndGetResponse)();
var subscription = readComplete
.Select(e => e.GetResponseStream())
.Subscribe(o);
return subscription;
});
}
使用最新的位和 .Net 4.5,您可以执行以下操作:
private static IObservable<byte[]> GetImages(string path)
{
var sources = new Uri[]
{
var uri = new Uri(path),
var thumburi = new Uri(path + "_thumb.jpg")
};
var obs = from uri in sources.ToObservable()
from data in Observable.Using(
() => new WebClient(),
client => client.DownloadDataTaskAsync(uri).ToObservable())
select data;
return obs;
}
我确实想知道您是否真的只想返回大量数据,而不关心哪个流对应于基础,哪个是缩略图。并行发出请求后,您将无法再控制它们返回的顺序。如果需要,可以投影包含 uri 和数据流的类型,以消除它们的歧义。
猜你会把异步调用拉出到两个单独的流,然后连接它们,不是吗?像这样: http://leecampbell.blogspot.com/2010/06/rx-part-5-combining-multiple.html
我会建议这种解决方案。
使GetImage
更通用的用途:
private static IObservable<Stream> GetImage(Uri uri)
{
return Observable.Create<Stream>(o =>
{
var request = (HttpWebRequest)HttpWebRequest.Create(uri);
var readComplete =
Observable.FromAsyncPattern<WebResponse>(
request.BeginGetResponse,
request.EndGetResponse)();
var subscription =
readComplete
.Select(e => e.GetResponseStream())
.Subscribe(o);
return subscription;
});
}
然后添加一个特定的GetImages
方法,用于查询图像及其缩略图:
private static IObservable<Tuple<Uri, Stream>> GetImages(string path)
{
var uris = new []
{
new Uri(path + ".jpg"),
new Uri(path + "_thumb.jpg"),
}.ToObservable();
return
from uri in uris
from stream in GetImage(uri)
select Tuple.Create(uri, stream);
}
我假设您的path
变量不能包含".jpg"扩展名,否则您将不得不进行一些字符串操作。
现在GetImages
返回一个IObservable<Tuple<Uri, Stream>>
,因为SelectMany
不保证流的返回顺序,因此我们需要使用 Uri
来消除流的歧义。
让我知道这是否适合您。
为什么不直接使用 Zip?
GetStream("foo.jpg").Zip(GetStream("bar.jpg"), (foo, bar) => new { foo, bar })
.Subscribe(fooAndBar => ...);