我正在尝试创建一个GetAndFetch
方法,该方法首先从缓存中返回数据,然后从Web服务获取和返回数据,最后更新缓存。
这样的函数已经存在于akavache
中,但是,它检索或存储的数据就像一个 blob。 即,如果我对rss
提要感兴趣,我只能在整个提要级别工作,而不能在单个项目上工作。我有兴趣创建一个将项目返回为IObservable<Item>
的版本。这样做的好处是,新Item
可以在service
返回后立即显示,而不是等待所有Items
。
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
// The basic idea is to first get the cached objects
IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);
// Then call the service
IObservable<Item> fetchObs = service.GetItems(feedUrl);
// Consolidate the cache & the retrieved data and then update cache
IObservable<Item> updateObs = fetchObs
.ToArray()
.MyFilter() // filter out duplicates between retried data and cache
.SelectMany(arg =>
{
return cache.InsertObject(feedUrl, arg)
.SelectMany(__ => Observable.Empty<Item>());
});
// Then make sure cache retrieval, fetching and update is done in order
return cacheBlobObject.SelectMany(x => x.ToObservable())
.Concat(fetchObs)
.Concat(upadteObs);
}
我的方法的问题在于Concat(upadteObs)
重新订阅fetchObs
并最终再次调用service.GetItems(feedUrl)
,这是浪费。
你听起来需要.Publish(share => { ... })
重载。
试试这个:
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
// The basic idea is to first get the cached objects
IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);
return
service
.GetItems(feedUrl)
.Publish(fetchObs =>
{
// Consolidate the cache & the retrieved data and then update cache
IObservable<Item> updateObs =
fetchObs
.ToArray()
.MyFilter() // filter out duplicates between retried data and cache
.SelectMany(arg =>
cache
.InsertObject(feedUrl, arg)
.SelectMany(__ => Observable.Empty<Item>()));
// Then make sure cache retrieval, fetching and update is done in order
return
cacheBlobObject
.SelectMany(x => x.ToObservable())
.Concat(fetchObs)
.Concat(updateObs);
});
}
我担心Concat
电话 - 它们可能需要Merge
.
此外,似乎您对service.GetItems
的调用无论如何都会获取所有项目 - 它如何避免缓存中已有的项目?
基于注释的替代实现:
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
return
(
from hs in cache.GetObject<HashSet<Item>>(feedUrl)
let ids = new HashSet<string>(hs.Select(x => x.Id))
select
hs
.ToObservable()
.Merge(
service
.GetItems(feedUrl)
.Where(x => !ids.Contains(x.Id))
.Do(x => cache.InsertObject(feedUrl, new [] { x })))
).Merge();
}