假设你有一个包含100个url的列表,你想下载它们,解析响应并通过IObservable推送结果:
public IObservable<ImageSource> GetImages(IEnumerable<string> urls)
{
return urls
.ToObservable()
.Select(async url =>
{
var bytes = await this.DownloadImage(url);
var image = await this.ParseImage(bytes);
return image;
});
}
我有一些问题。
一个是,在同一时间用100个请求冲击服务器是不礼貌的——理想情况下,你可能会在给定时刻限制6个请求。但是,如果我添加Buffer
调用,由于Select
中的异步lambda,所有内容仍然同时触发。
此外,返回结果的顺序将与url的输入顺序不同,这很糟糕,因为图像是将在UI上显示的动画的一部分。
我已经尝试了各种方法,我有一个有效的解决方案,但感觉很复杂:
public IObservable<ImageSource> GetImages(IEnumerable<string> urls)
{
var semaphore = new SemaphoreSlim(6);
return Observable.Create<ImageSource>(async observable =>
{
var tasks = urls
.Select(async url =>
{
await semaphore.WaitAsync();
var bytes = await this.DownloadImage(url);
var image = await this.ParseImage(url);
})
.ToList();
foreach (var task in tasks)
{
observable.OnNext(await task);
}
observable.OnCompleted();
});
}
它工作,但现在我做的是Observable.Create
而不是IObservable.Select
,我必须弄乱信号量。此外,在UI上运行的其他动画停止时,这是运行(他们基本上只是DispatcherTimer
实例),所以我想我一定是做错了什么。
试一试:
urls.ToObservable()
.Select(url => Observable.FromAsync(async () => {
var bytes = await this.DownloadImage(url);
var image = await this.ParseImage(bytes);
return image;
}))
.Merge(6 /*at a time*/);
我们在这里做什么?对于每个URL,我们都创建了一个冷观察对象(也就是说,它不会做任何事情,直到有人调用Subscribe)。FromAsync
返回一个可观察对象,当你订阅它时,运行你给它的异步块。因此,我们选择URL到一个对象,它会为我们做的工作,但只有当我们问它。
那么,我们的结果是一个IObservable<IObservable<Image>>
——一个未来结果流。我们想把这个流平铺成一个结果流,所以我们使用Merge(int)
。合并操作符将一次订阅n
项,当它们返回时,我们将订阅更多项。即使url列表非常大,Merge所缓冲的项目也只是一个url和一个Func对象(即要做什么的描述),所以相对较小。
实现这一目标的一种方法是使用Observable.StartAsync
方法、SemaphoreSlim
和Concat
运算符。Observable.StartAsync
将创建热可观察对象(立即启动),SemaphoreSlim
将限制图像的下载/解析,Concat
将按原始顺序收集图像。
public IObservable<ImageSource> GetImages(IEnumerable<string> urls, int maxConcurrency)
{
return Observable.Using(() => new SemaphoreSlim(maxConcurrency), semaphore =>
urls
.ToObservable()
.Select(url => Observable.StartAsync(async cancellationToken =>
{
await semaphore.WaitAsync(cancellationToken);
try
{
var bytes = await this.DownloadImage(url);
var image = await this.ParseImage(bytes);
return image;
}
finally { semaphore.Release(); }
}))
.Concat());
}
您可以考虑将cancellationToken
参数传递给DownloadImage
和ParseImage
方法,以避免在后台运行"即发即弃"操作,以防产生的IObservable<ImageSource>
由于任何原因(错误或取消订阅)而过早终止。