我想用RxJs实现Time Expiry缓存。以下是"正常"缓存的示例:
//let this represents "heavy duty job"
var data = Rx.Observable.return(Math.random() * 1000).delay(2000);
//and we want to cache result
var cachedData = new Rx.AsyncSubject();
data.subscribe(cachedData);
cachedData.subscribe(function(data){
//after 2 seconds, result is here and data is cached
//next subscribe returns immediately data
cachedData.subscribe(function(data2){ /*this is "instant"*/ });
});
首次调用cachedData
上的subscribe
时,调用"重载作业",2秒后结果保存在cachedData
(AsyncSubject
)中。cachedData
上的任何其他后续subscribe
立即返回并保存结果(因此缓存实现)。
我想实现的是在cachedData
有效的时间段内"增加趣味性",当时间过去时,我想为新数据重新运行"重载作业",并在新的时间段再次缓存它,等等。
期望行为:
//pseudo code
cachedData.youShouldExpireInXSeconds(10);
//let's assume that all code is sequential from here
//this is 1.st run
cachedData.subscribe(function (data) {
//this first subscription actually runs "heavy duty job", and
//after 2 seconds first result data is here
});
//this is 2.nd run, just after 1.st run finished
cachedData.subscribe(function (data) {
//this result is cached
});
//15 seconds later
// cacheData should expired
cachedData.subscribe(function (data) {
//i'm expecting same behaviour as it was 1.st run:
// - this runs new "heavy duty job"
// - and after 2 seconds we got new data result
});
//....
//etc
我是Rx(Js)的新手,不知道如何通过冷却来实现这种热观察。
您所缺少的只是安排一个任务,在一段时间后用新的AsyncSubject
替换您的cachedData
。以下是如何将其作为一种新的Rx.Observable
方法:
Rx.Observable.prototype.cacheWithExpiration = function(expirationMs, scheduler) {
var source = this,
cachedData = undefined;
// Use timeout scheduler if scheduler not supplied
scheduler = scheduler || Rx.Scheduler.timeout;
return Rx.Observable.create(function (observer) {
if (!cachedData) {
// The data is not cached.
// create a subject to hold the result
cachedData = new Rx.AsyncSubject();
// subscribe to the query
source.subscribe(cachedData);
// when the query completes, start a timer which will expire the cache
cachedData.subscribe(function () {
scheduler.scheduleWithRelative(expirationMs, function () {
// clear the cache
cachedData = undefined;
});
});
}
// subscribe the observer to the cached data
return cachedData.subscribe(observer);
});
};
用法:
// a *cold* observable the issues a slow query each time it is subscribed
var data = Rx.Observable.return(42).delay(5000);
// the cached query
var cachedData = data.cacheWithExpiration(15000);
// first observer must wait
cachedData.subscribe();
// wait 3 seconds
// second observer gets result instantly
cachedData.subscribe();
// wait 15 seconds
// observer must wait again
cachedData.subscribe();
一个简单的解决方案是为repeatWhen
创建一个自定义的可管道操作符,该操作符的持续时间已过。以下是我的想法:
export const refreshAfter = (duration: number) => (source: Observable<any>) =>
source.pipe(
repeatWhen(obs => obs.pipe(delay(duration))),
publishReplay(1),
refCount());
然后我这样使用它:
const serverTime$ = this.environmentClient.getServer().pipe(map(s => s.localTime))
const cachedServerTime$ = serverTime.pipe(refreshAfter(5000)); // 5s cache
重要提示:这使用publishReplay(1),refCount(),因为shareReplay(2)不会取消订阅可观察的源,因此它将永远访问您的服务器。不幸的是,这导致在出现错误时,该错误将从publishReplay(1)
、refCount()
重放。"新改进"的shareReplay即将推出。请参阅此处关于类似问题的注释。一旦这个"新"版本可用,这个答案就应该更新——但自定义运算符的美妙之处在于,你可以在一个地方修复它们。