我正在使用响应式扩展(Reactive Extensions, Rx)和存储库模式来方便地从相对较慢的数据源获取数据。我有以下(简化的)接口:
public interface IStorage
{
IObservable<INode> Fetch(IObservable<Guid> ids);
}
创建IStorage
实现的实例是缓慢的-想想创建web服务或db连接。ids
观测中的每个Guid
都会在返回观测中产生一个一对一的INode
(或null
),每个结果都是昂贵的。因此,只有当我至少有一个值要获取,然后使用IStorage
仅为每个Guid
获取一次值时,才实例化IStorage
对我来说是有意义的。
IStorage
的调用,我在Repository
类中缓存结果,如下所示:
public class Repository
{
private Dictionary<Guid, INode> NodeCache { get; set; }
private Func<IStorage> StorageFactory { get; set; }
public IObservable<INode> Fetch(IObservable<Guid> ids)
{
var lazyStorage = new Lazy<IStorage>(this.StorageFactory);
// from id in ids
// if NodeCache contains id select NodeCache[id]
// else select node from lazyStorage.Value.Fetch(...)
}
}
在Repository.Fetch(...)
方法中,我包含了指示我正在尝试做什么的注释。
本质上,如果NodeCache
包含所有被提取的id,那么IStorage
永远不会被实例化,并且几乎没有延迟返回结果。然而,如果任何一个id不在缓存中,那么IStorage
将被实例化,所有未知的id将通过IStorage.Fetch(...)
方法传递。
需要维护一对一的映射,包括顺序保持。
任何想法?
我花了一点时间才算出来,但我终于有了自己的解决方案。
我定义了两个名为FromCacheOrFetch
的扩展方法,它们具有以下签名:
IObservable<R> FromCacheOrFetch<T, R>(
this IObservable<T> source,
Func<T, R> cache,
Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler)
where R : class
IObservable<R> FromCacheOrFetch<T, R>(
this IObservable<T> source,
Func<T, Maybe<R>> cache,
Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler)
第一个使用标准CLR/Rx类型,第二个使用Maybe
monad(可空类型,不限于值类型)。
第一个方法只是将Func<T, R>
转换为Func<T, Maybe<R>>
,并调用第二个方法。
背后的基本思想是,当要查询源时,将检查缓存中的每个值,以查看结果是否已经存在,如果存在,则立即返回结果。然而,如果任何结果丢失,那么只有在这种情况下,通过传入Subject<T>
调用fetch函数,现在所有缓存丢失都通过fetch函数传递。调用代码负责将结果添加到缓存中。代码通过fetch函数异步处理所有值,并将结果以及缓存的结果重新组装成正确的顺序。
效果很好。: -)
代码如下:
public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
Func<T, R> cache, Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler)
where R : class
{
return source
.FromCacheOrFetch<T, R>(t => cache(t).ToMaybe(null), fetch, scheduler);
}
public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
Func<T, Maybe<R>> cache, Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler)
{
var results = new Subject<R>();
var disposables = new CompositeDisposable();
var loop = new EventLoopScheduler();
disposables.Add(loop);
var sourceDone = false;
var pairsDone = true;
var exception = (Exception)null;
var fetchIn = new Subject<T>();
var fetchOut = (IObservable<R>)null;
var pairs = (IObservable<KeyValuePair<int, R>>)null;
var lookup = new Dictionary<T, int>();
var list = new List<Maybe<R>>();
var cursor = 0;
Action checkCleanup = () =>
{
if (sourceDone && pairsDone)
{
if (exception == null)
{
results.OnCompleted();
}
else
{
results.OnError(exception);
}
loop.Schedule(() => disposables.Dispose());
}
};
Action dequeue = () =>
{
while (cursor != list.Count)
{
var mr = list[cursor];
if (mr.HasValue)
{
results.OnNext(mr.Value);
cursor++;
}
else
{
break;
}
}
};
Action<KeyValuePair<int, R>> nextPairs = kvp =>
{
list[kvp.Key] = Maybe<R>.Something(kvp.Value);
dequeue();
};
Action<Exception> errorPairs = ex =>
{
fetchIn.OnCompleted();
pairsDone = true;
exception = ex;
checkCleanup();
};
Action completedPairs = () =>
{
pairsDone = true;
checkCleanup();
};
Action<T> sourceNext = t =>
{
var mr = cache(t);
list.Add(mr);
if (mr.IsNothing)
{
lookup[t] = list.Count - 1;
if (fetchOut == null)
{
pairsDone = false;
fetchOut = fetch(fetchIn.ObserveOn(Scheduler.ThreadPool));
pairs = fetchIn
.Select(x => lookup[x])
.Zip(fetchOut, (i, r2) => new KeyValuePair<int, R>(i, r2));
disposables.Add(pairs
.ObserveOn(loop)
.Subscribe(nextPairs, errorPairs, completedPairs));
}
fetchIn.OnNext(t);
}
else
{
dequeue();
}
};
Action<Exception> errorSource = ex =>
{
sourceDone = true;
exception = ex;
fetchIn.OnCompleted();
checkCleanup();
};
Action completedSource = () =>
{
sourceDone = true;
fetchIn.OnCompleted();
checkCleanup();
};
disposables.Add(source
.ObserveOn(loop)
.Subscribe(sourceNext, errorSource, completedSource));
return results.ObserveOn(scheduler);
}
类似这样的内容(我假设您只想为所有订阅者实例化存储一次):
public class Repository
{
public Repository()
{
_lazyStorage = new Lazy<IStorage>(StorageFactory);
}
private readonly Lazy<IStorage> _lazyStorage;
private Dictionary<Guid, INode> NodeCache { get; set; }
private Func<IStorage> StorageFactory { get; set; }
public IObservable<INode> Fetch(IObservable<Guid> ids)
{
return Observable
.CreateWithDisposable<INode>(observer =>
ids.Subscribe(x =>
{
INode node;
if (NodeCache.TryGetValue(x, out node))
observer.OnNext(node);
else
{
node = _lazyStorage.Value.Fetch(x);
NodeCache[x] = node;
observer.OnNext(node);
}
}, observer.OnError, observer.OnCompleted));
}
}
编辑:嗯,这个订单一边保存一边保管。异步取很有趣——等待存储。Fetch应该阻塞所有未来的值…思维…
我想我明白了…也许……如果你需要保持秩序,你需要排队。在RX世界中,队列是。concat。下面的内容对你有用吗?
public class Repository
{
public Repository()
{
_lazyStorage = new Lazy<IStorage>(StorageFactory);
}
private readonly Lazy<IStorage> _lazyStorage;
private Dictionary<Guid, INode> NodeCache { get; set; }
private Func<IStorage> StorageFactory { get; set; }
private IObservable<INode> Fetcher(Guid id)
{
return Observable.Defer(() =>
{
INode node;
return NodeCache.TryGetValue(id, out node)
? Observable.Return(node)
: _lazyStorage.Value.Fetch(id).Do(x => NodeCache[id] = x);
});
}
public IObservable<INode> Fetch(IObservable<Guid> ids)
{
return ids.Select(Fetcher).Concat();
}
}