重播()运算符的自定义过期



我有一个Observable<Snapshot>流,订阅后,它会重播为每个活动实体生成的每个快照的事件日志(以及订阅后的任何新快照(。此重播可以包含同一实体的多个Snapshot。如果一个实体在过去24小时内有快照,则该实体被视为处于活动状态。

我正在尝试设置一个热可观察对象,当订阅时,它将只重播给定实体的最新Snapshot以及订阅后的任何新快照,以显示在UI的表中。

这是我的代码:

Observable<Snapshot> snapshots = getContinuousSnapshotsStream();

var cache = snapshots.groupBy(Snapshot::getId)
.map(g -> {
var o = g.timeout(1, TimeUnit.DAYS)
.onErrorComplete()
.replay(1);
o.connect();
return o.hide();
})
.replay(); // memory leak, holds onto references to groups that have timed out already
// start cache
cache.connect();

// each client UI subscription will flatten this
cache.flatMap(o -> o);

正如您从评论中看到的,缓存中的回复将保留在已经超时的组中。我需要一种方法,oonComplete(),将其从重播中删除。

有没有RX操作员可以让我在不管理单独缓存的情况下实现我的目标?

不幸的是,这无法用标准运算符实现。您必须编写一个自定义运算符,它可以通过使用一些标准运算符和手动内部状态管理来近似。这就是我想到的:

record MessageValue<T>(T value) { }
record MessageSubscriber<T>(Emitter<T> emitter) { }
record MessageDisposed<T>(Emitter<T> emitter) { }
record MessageTimeout<K>(K key, long id) { }
record CachedValue<T>(T value, long id, Disposable timeout) { }
public static <T, K> Observable<T> groupCacheLatestWithTimeout(
Observable<T> source, 
long timeout, TimeUnit unit, Scheduler scheduler,
Function<T, K> keySelector) {
Subject<Object> messageQueue = PublishSubject.create().toSerialized(); 

Map<K, CachedValue<T>> cache = new LinkedHashMap<>();
List<Emitter<T>> emitters = new ArrayList<>();

AtomicLong idGenerator = new AtomicLong();
var result = Observable.<T>create(emitter -> {
messageQueue.onNext(new MessageSubscriber<T>(emitter));
emitter.setCancellable(() -> {
messageQueue.onNext(new MessageDisposed<T>(emitter));
});
});

messageQueue.subscribe(message -> {
if (message instanceof MessageValue) {
var mv = ((MessageValue<T>)message).value;
var key = keySelector.apply(mv);

var old = cache.get(key);
if (old != null) {
old.timeout.dispose();
}

var id = idGenerator.incrementAndGet();

var dispose = scheduler.scheduleDirect(() -> {
messageQueue.onNext(new MessageTimeout<K>(key, id));
});

cache.put(key, new CachedValue<T>(mv, id, dispose));

for (var emitter : emitters) {
emitter.onNext(mv);
}
}
else if (message instanceof MessageSubscriber) {

var me = ((MessageSubscriber<T>)message).emitter;
emitters.add(me);

for (var entry : cache.values()) {
me.onNext(entry.value);
}
}
else if (message instanceof MessageDisposed) {
var md = ((MessageDisposed<T>)message).emitter;
emitters.remove(md);
}
else if (message instanceof MessageTimeout) {
var mt = ((MessageTimeout<K>)message);

var entry = cache.get(mt.key);
if (entry.id == mt.id) {
cache.remove(mt.key);
}
}
});

source.subscribe(value -> {
messageQueue.onNext(new MessageValue<>(value));
});
return result;
}

需要做的是创建一个事件循环并序列化与缓存的交互:缓存最新的上游项,使缓存项超时,管理新的订阅服务器和重播,删除订阅服务器。

你可以这样测试:

var subject = PublishSubject.<String>create();
var sched = new TestScheduler();
var output = groupCacheLatestWithTimeout(subject, 
5, TimeUnit.SECONDS, sched, 
k -> k.substring(0, 2));
var to1 = output.test();
to1.assertEmpty();
subject.onNext("g1-1");
to1.assertValuesOnly("g1-1");
subject.onNext("g1-2");
to1.assertValuesOnly("g1-1", "g1-2");
var to2 = output.test();
to2.assertValuesOnly("g1-2");
sched.advanceTimeBy(10, TimeUnit.SECONDS);
to1.assertValuesOnly("g1-1", "g1-2");
to2.assertValuesOnly("g1-2");
var to3 = output.test();
to3.assertEmpty();
subject.onNext("g1-3");
to1.assertValuesOnly("g1-1", "g1-2", "g1-3");
to2.assertValuesOnly("g1-2", "g1-3");
to3.assertValuesOnly("g1-3");
to1.dispose();
subject.onNext("g1-4");
to1.assertValuesOnly("g1-1", "g1-2", "g1-3");
to2.assertValuesOnly("g1-2", "g1-3", "g1-4");
to3.assertValuesOnly("g1-3", "g1-4");

这是我使用的解决方案。虽然我主观上认为这比@akarnkd的回答更简单,但如果没有先阅读答案,我是无法想出这个答案的。

Observable<Snapshot> snapshots = getContinuousSnapshotsStream();
var scheduler = Schedulers.single(); // synchronizer to ensure no race condition where we miss events.
var cache = new LinkedHashMap<String, Observable<Snapshot>>();
var events = snapshots.groupBy(Snapshot::getId)
.observeOn(scheduler)
.map(g -> {
var o = g.timeout(1, TimeUnit.DAYS)
.doFinally(() -> scheduler.scheduleDirect(() -> cache.remove(g.getKey())))
.replay(1);
cache.put(g.getKey(), o);
return o.autoConnect(-1);
})
.publish()
.autoConnect(-1);
// each client UI subscription would subscribe to this:
var sub = Observable.merge(events, Observable.fromIterable(cache.values()))
.subscribeOn(scheduler)
.flatmap(r -> r);

最新更新