如何在RxJS中实现时间到期热可观察(或在反应式扩展中通用)



我想用RxJs实现Time Expiry缓存。以下是"正常"缓存的示例:

//let this represents "heavy duty job"
var data = Rx.Observable.return(Math.random() * 1000).delay(2000);
//and we want to cache result
var cachedData = new Rx.AsyncSubject();
data.subscribe(cachedData);
cachedData.subscribe(function(data){
    //after 2 seconds, result is here and data is cached
    //next subscribe returns immediately data
    cachedData.subscribe(function(data2){ /*this is "instant"*/ });
});

首次调用cachedData上的subscribe时,调用"重载作业",2秒后结果保存在cachedDataAsyncSubject)中。cachedData上的任何其他后续subscribe立即返回并保存结果(因此缓存实现)。

我想实现的是在cachedData有效的时间段内"增加趣味性",当时间过去时,我想为新数据重新运行"重载作业",并在新的时间段再次缓存它,等等。

期望行为:

//pseudo code
cachedData.youShouldExpireInXSeconds(10);

//let's assume that all code is sequential from here
//this is 1.st run
cachedData.subscribe(function (data) {
    //this first subscription actually runs "heavy duty job", and
    //after 2 seconds first result data is here
});
//this is 2.nd run, just after 1.st run finished
cachedData.subscribe(function (data) {
    //this result is cached
});
//15 seconds later
// cacheData should expired
cachedData.subscribe(function (data) {
    //i'm expecting same behaviour as it was 1.st run:
    // - this runs new "heavy duty job"
    // - and after 2 seconds we got new data result
});

//....
//etc

我是Rx(Js)的新手,不知道如何通过冷却来实现这种热观察。

您所缺少的只是安排一个任务,在一段时间后用新的AsyncSubject替换您的cachedData。以下是如何将其作为一种新的Rx.Observable方法:

Rx.Observable.prototype.cacheWithExpiration = function(expirationMs, scheduler) {
    var source = this,
        cachedData = undefined;
    // Use timeout scheduler if scheduler not supplied
    scheduler = scheduler || Rx.Scheduler.timeout;
    return Rx.Observable.create(function (observer) {
        if (!cachedData) {
            // The data is not cached.
            // create a subject to hold the result
            cachedData = new Rx.AsyncSubject();
            // subscribe to the query
            source.subscribe(cachedData);
            // when the query completes, start a timer which will expire the cache
            cachedData.subscribe(function () {
                scheduler.scheduleWithRelative(expirationMs, function () {
                    // clear the cache
                    cachedData = undefined;
                });
            });
        }
        // subscribe the observer to the cached data
        return cachedData.subscribe(observer);
    });
};

用法:

// a *cold* observable the issues a slow query each time it is subscribed
var data = Rx.Observable.return(42).delay(5000);
// the cached query
var cachedData = data.cacheWithExpiration(15000);
// first observer must wait
cachedData.subscribe();
// wait 3 seconds
// second observer gets result instantly
cachedData.subscribe();
// wait 15 seconds
// observer must wait again
cachedData.subscribe();

一个简单的解决方案是为repeatWhen创建一个自定义的可管道操作符,该操作符的持续时间已过。以下是我的想法:

 export const refreshAfter = (duration: number) => (source: Observable<any>) =>
                                  source.pipe(
                                     repeatWhen(obs => obs.pipe(delay(duration))),
                                     publishReplay(1), 
                                     refCount());

然后我这样使用它:

const serverTime$ = this.environmentClient.getServer().pipe(map(s => s.localTime))
const cachedServerTime$ = serverTime.pipe(refreshAfter(5000)); // 5s cache

重要提示:这使用publishReplay(1),refCount(),因为shareReplay(2)不会取消订阅可观察的源,因此它将永远访问您的服务器。不幸的是,这导致在出现错误时,该错误将从publishReplay(1)refCount()重放。"新改进"的shareReplay即将推出。请参阅此处关于类似问题的注释。一旦这个"新"版本可用,这个答案就应该更新——但自定义运算符的美妙之处在于,你可以在一个地方修复它们。

相关内容

  • 没有找到相关文章

最新更新