如何使用/控制RxJava Observable.cache



我正在尝试使用RxJava缓存机制(RxJava2),但我似乎无法理解它是如何工作的,也无法控制缓存的内容,因为有cache运算符。

我想在发出新数据之前,用一些条件验证缓存的数据。

例如

someObservable.
repeat().
filter { it.age < maxAge }.
map(it.name).
cache() 

如果成功,我如何检查和过滤缓存值并发出它?如果不成功,我将请求一个新值。

由于该值会定期更改,因此在请求新的缓存之前,我需要验证缓存是否仍然有效。

还有ObservableCache<T>类,但我找不到任何使用它的资源

任何帮助都将不胜感激。谢谢

回放/缓存不是这样工作的。请先阅读#replay/#cache文档。

重播

这个操作符返回一个ConnectableObservable,它有一些连接到源的方法(#refCount/#connect/#autoConnect)。

当在没有过载的情况下应用#replay时,源订阅将被多播,并且连接中所有发出的值都将被重播。源订阅是惰性的,可以通过#refCount/#connect/#autoConnect连接到源。

返回一个ConnectableObservable,它共享对基础ObservableSource的单个订阅,该订阅将向任何未来的Observer重播其所有项目和通知。

在没有任何连接方法的情况下应用#中继(#refCount/#connect/#autoConnect)将不会在订阅上发出任何值

可连接的ObservableSource类似于普通的Observable Source,只是它在订阅时不会开始发出项,而是仅在调用其连接方法时才开始发出项。

重播(1)#autoConnect(-1)/#refCount(1)/#connect

应用重播(1)将缓存最后一个值,并在每个订阅上发出缓存的值。#autoConnect将立即连接打开连接,并保持打开状态,直到发生终端事件(onComplete,onError)#refCount是轻微的,但当所有订阅者都消失时,它将与源断开连接。当您需要等待时,当对可观察对象进行了所有订阅时,可以使用#connect-orepeator,以免错过值。

用法

#重播(1)——大部分应该在可观察的末尾使用。

sourcObs.
.filter()
.map()
.replay(bufferSize)
.refCount(connectWhenXSubsciberSubscribed) 

谨慎

当你观察到无限时,在没有缓冲区限制或过期日期的情况下应用#重播将导致内存泄漏

cache/cacheWithInitialCapacity

运算符类似于带有autoConnect(1)的#replay。运算符将缓存每个值,并在每个子脚本上重播。

只有当第一个下游订阅者订阅并维护对该ObservableSource的单个订阅时,运营商才会订阅。相反,返回ConnectableObservable的replay()运算符家族需要显式调用ConnectableObserver.connect()。注意:当您使用缓存Observer时,您牺牲了处理原点的能力,因此请注意不要在ObservableSources上使用此Observer,因为ObservableSource会发出无限或非常大量的项目,这些项目将占用内存。一个可能的解决方法是在应用cache()之前(也许之后)将takeUntil与谓词或另一个源一起应用。

示例

@Test
fun skfdsfkds() {
val create = PublishSubject.create<Int>()
val cacheWithInitialCapacity = create
.cacheWithInitialCapacity(1)
cacheWithInitialCapacity.subscribe()
create.onNext(1)
create.onNext(2)
create.onNext(3)
cacheWithInitialCapacity.test().assertValues(1, 2, 3)
cacheWithInitialCapacity.test().assertValues(1, 2, 3)
}

用法

使用缓存操作符,当您无法控制连接阶段时

当您希望ObservableSource缓存响应并且无法控制所有Observators的订阅/处置行为时,这很有用。

注意事项

与replay()一样,缓存是无限制的,可能会导致内存泄漏。

注意:容量提示不是缓存大小的上限。为此,可以考虑将replay(int)与ConnectableObservable.autoConnect()或类似功能结合使用。

进一步阅读

https://blog.danlew.net/2018/09/25/connectable-observables-so-hot-right-now/

https://blog.danlew.net/2016/06/13/multicasting-in-rxjava/

如果您的事件源(Observable)是一个昂贵的操作,例如从数据库读取,则不应该使用Subject来观察事件,因为这将为每个订阅者重复昂贵的操作。缓存对于无限流也可能是有风险的;内存不足";例外情况。更合适的解决方案可能是ConnectableObservable,它只执行一次源操作,并将更新后的值广播给所有订户。

这是一个代码示例。为了简化示例,我没有创建无限周期流,也没有包含错误处理。如果它能满足你的需要,请告诉我。

class RxJavaTest {
private final int maxValue = 50;
private final ConnectableObservable<Integer> source =
Observable.<Integer>create(
subscriber -> {
log("Starting Event Source");
subscriber.onNext(readFromDatabase());
subscriber.onNext(readFromDatabase());
subscriber.onNext(readFromDatabase());
subscriber.onComplete();
log("Event Source Terminated");
})
.subscribeOn(Schedulers.io())
.filter(value -> value < maxValue)
.publish();
void run() throws InterruptedException {
log("Starting Application");
log("Subscribing");
source.subscribe(value -> log("Subscriber 1: " + value));
source.subscribe(value -> log("Subscriber 2: " + value));
log("Connecting");
source.connect();
// Add sleep to give event source enough time to complete
log("Application Terminated");
sleep(4000);
}
private Integer readFromDatabase() throws InterruptedException {
// Emulate long database read time
log("Reading data from database...");
sleep(1000);
int randomValue = new Random().nextInt(2 * maxValue) + 1;
log(String.format("Read value: %d", randomValue));
return randomValue;
}
private static void log(Object message) {
System.out.println(
Thread.currentThread().getName() + " >> " + message
);
}
}

这是输出:

main >> Starting Application
main >> Subscribing
main >> Connecting
main >> Application Terminated
RxCachedThreadScheduler-1 >> Starting Event Source
RxCachedThreadScheduler-1 >> Reading data from database...
RxCachedThreadScheduler-1 >> Read value: 88
RxCachedThreadScheduler-1 >> Reading data from database...
RxCachedThreadScheduler-1 >> Read value: 42
RxCachedThreadScheduler-1 >> Subscriber 1: 42
RxCachedThreadScheduler-1 >> Subscriber 2: 42
RxCachedThreadScheduler-1 >> Reading data from database...
RxCachedThreadScheduler-1 >> Read value: 37
RxCachedThreadScheduler-1 >> Subscriber 1: 37
RxCachedThreadScheduler-1 >> Subscriber 2: 37
RxCachedThreadScheduler-1 >> Event Source Terminated.

注意以下内容:

  • 只有在对源调用connect()时,事件才会启动,而不是在观察者订阅源时
  • 每次事件更新只进行一次数据库调用
  • 过滤后的值不会发送给订阅者
  • 所有订阅者都在同一个线程中执行
  • 由于并发性,应用程序在处理事件之前终止。通常情况下,您的应用程序将在事件循环中运行,因此在慢速操作期间,您的程序将保持响应

相关内容

  • 没有找到相关文章

最新更新