要求
我正在尝试实现一个类,该类计算操作完成的估计剩余时间。我想将解决方案建立在RxJava的基础上——让类将更新后的估计值公开为Observable。
为了解决这个问题,我把我的解决方案精简到了最低限度,放弃了计算和所有其他的细节。
剩下的核心思想是,我们正在听取进度更新,但如果有一段时间没有进展(想象一下大量文件被复制,应用程序被一个特别大的文件"卡住"),我希望在等待来自实际来源的新信息的同时,定期发出更新。
(显然,在所描述的示例场景中,使用代码会观察到在处理文件时估计值越来越差)。
我还想对解决方案进行单元测试。
简化、可编译的实现(不起作用)
这是我的Watcher
类(如果是这样的话,显然我不需要两个主题,但它反映了原始的、更复杂的实现的结构):
public class Watcher {
public List<String> buffer = new ArrayList<>();
public BehaviorSubject<String> retriever = BehaviorSubject.create();
public BehaviorSubject<String> publisher = BehaviorSubject.create();
public Watcher() {
retriever
// when no data is coming, start the timer
.switchMap(
new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable
.interval(1, 1, TimeUnit.SECONDS)
.map(new Func1<Long, String>() {
@Override
public String call(Long tick) {
return "tick " + tick;
}
});
}
})
.doOnNext(
new Action1<String>() {
@Override
public void call(String str) {
buffer.add(str);
}
}
)
.subscribe(publisher);
}
public void add(String str) {
retriever.onNext(str);
}
public Observable<String> asObservable() {
return publisher.asObservable();
}
}
测试代码(失败)
以下是使用TestScheduler
模拟时间流的测试:
TestScheduler testScheduler = new TestScheduler();
TestSubscriber subscriber = new TestSubscriber();
Watcher watcher = new Watcher();
// adding this before the subscription occurs
// i'd also like to ensure that the observer "catches up" on whatever it missed
// - that's why I used PublishSubjects
watcher.add("A");
watcher
.asObservable()
// this bit should get the whole thing running on TestScheduler
// so that it reacts to artificial shifts of time
.subscribeOn(testScheduler)
.subscribe(subscriber);
watcher.add("B");
// this should get the timer going...
testScheduler.advanceTimeBy(5, TimeUnit.SECONDS);
// and here I expect it to get disabled
watcher.add("C");
// over to the ticking timer
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
预期结果与实际结果
我的预期结果将接收:A,B,几次,C,几次
我得到的都是滴答声!看不到A、B或C。既不在testScheduler
中,也不在Watcher
自己的缓冲区中。
备注
有趣的是,实际的实现遇到了相反的问题:我得到的是实际的输入,而缺少的是记号。
所以很明显,我没有在我的简化版本中准确地重新创建它。然而,这个错误似乎具有类似的性质:由于某种原因,在两个来源之间切换并没有像我想象的那样起作用。
问题
我做错了什么?我是否误解了switchMap
的工作原理?我是否滥用TestScheduler
及其虚拟时间安排?
除了我可能犯的任何错误之外,有没有更好、更地道的替代方法可以完成它?
@Konrad,感谢您提出的详细问题和答案。我不太确定我是否完全理解你的要求,但在我看来startWith
应该胜任这项工作。
此外,如果您询问"Rx-idomatic",从Transformer
派生Watcher
是一种很好的做法。这样可以更容易地在Rx链中重用您的方法。
这是一个完整的代码,它实现了我相信您想要实现的目标。它是Java 8,编写Rx代码更符合人体工程学。
package com.reactive;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.TestScheduler;
import rx.subjects.BehaviorSubject;
import sun.jvm.hotspot.utilities.Assert;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
// According to https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators
// you should use transformers to implement your own observables
class Watcher implements Observable.Transformer<String,String> {
Scheduler _scheduler;
// As you already realized in your answer you need to specify the scheduler if you want to control the interval observable
public Watcher(Scheduler scheduler) {
_scheduler = scheduler;
}
@Override
public Observable<String> call(Observable<String> retriever) {
return retriever.switchMap(s ->
// Create the sequence of ticks
Observable.interval(1,TimeUnit.SECONDS,_scheduler)
.map(tick -> "tick " + tick)
// but prepend the actual signal from the retriever
.startWith(s)
);
}
}
public class Main {
public static void main(String[] args) {
TestScheduler testScheduler = new TestScheduler();
BehaviorSubject<String> retriever = BehaviorSubject.create();
ArrayList<String> results = new ArrayList<>();
retriever.compose(new Watcher(testScheduler))
.subscribe(s->results.add(s));
retriever.onNext("A");
retriever.onNext("B");
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
retriever.onNext("C");
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
String result = String.join(" , ",results);
Assert.that(result.equals("A , B , tick 0 , tick 1 , tick 2 , C , tick 0 , tick 1"),result);
}
}
没有答案伤害了我的感情,但我咬紧牙关,继续努力,最终使它按预期工作。
我不确定我的实现是惯用的还是正确的(不是狭义的得出正确的结果,它确实做到了——但如果它不有力/不熟练,我不会发誓)。
我花了一些时间来阐述它,虽然如果这对任何人都有帮助,我会很高兴,但我还不是一个专家,如果有人提供他们的见解,我会非常感激,如果他们不同意,可能会纠正我的一些假设。
我通过三个基本步骤解决了这个问题。
1
我现在手动将testScheduler
(总是使用虚拟时间调度)传递到嵌套的Observable.interval
中。
因此,TestScheduler testScheduler = new TestScheduler();
现在被转换为构造函数参数,
Observable.interval(1, 1, TimeUnit.SECONDS)
被取代
Observable.interval(1, 1, TimeUnit.SECONDS, testScheduler)
否则(显然)Observable.interval
将继续在其自己的默认调度程序上运行,即使在我调试测试并且它在断点上停止时,它也会继续向缓冲区添加"ticks"。
注意,我已经使用了subscribeOn
来尝试将整个东西放在testScheduler
:上
watcher
.asObservable()
.subscribeOn(testScheduler)
关键是subscribeOn
不会影响嵌套的可观察性:它们仍然会在默认的Scheduler
上运行,您必须小心手动覆盖它如果我误解了这一点,请原谅你的想法
2
此位:
.switchMap(
new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable
.interval(1, 1, TimeUnit.SECONDS, testScheduler)
.map(new Func1<Long, String>() {
@Override
public String call(Long tick) {
return "tick " + tick;
}
});
}
})
仍然会让我在缓冲区中只剩下"记号"。"A"、"B"、"C"不见了(但是,在我预计A、B、C会出现的地方,勾号计数器会重置回0)。
为了解决这个问题,我不得不像这样加强实施:
.switchMap(
new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable
// this preserves the original item
// that caused the switch (the A, B or C)
.just(s)
// and here we say we want it followed by ticks
.concatWith(
Observable.interval(1, 1, TimeUnit.SECONDS, scheduler)
// <snip>
它现在确实有效,但它已经嵌套得很深了。RxJava的主要卖点之一是有助于避免深度嵌套,并使数据流上的操作变得平坦那么,还有更好、更平坦的替代方案吗?我做错了吗这个问题仍然存在。
3
此时,watcher.buffer
已经包含"A,B,tick 0,tick 1,tick 2,tick 3,tick 4,C,tick O,tick l 1"。我看到这很好。
然而,我的TestSubscriber subscriber
仍然缺少第一个"A"(列表subscriber.getOnNextEvents()
以"B"开头)。
的确,插入"A"发生在订阅之前:
watcher.add("A");
watcher
.asObservable()
.subscribeOn(testScheduler)
.subscribe(subscriber);
watcher.add("B");
然而,正如我的问题中的评论所解释的,这是经过深思熟虑的,因为Watcher.publisher
(我们订阅的)是BehaviorSubject
实例,因此对它来说应该不是问题。
根据文件,BehaviorSubject
是
发出其观察到的最新项目的主题每个订阅的观察者的后续观察项目
因此,它应该将它能回忆起的最新项目传递给任何新订阅的Observer。我知道"A"一定穿过了可观测到的,因为它存在于watcher.buffer
中。为什么publisher
不能通过首先传递缓存的"A"来对subscribe
作出反应?
我花了一段时间才明白。
答案似乎是:随着时间虚拟化,我们必须一路走下去。如果我们不手动按下时钟,时间就会完全冻结,根本不会执行任何操作。
因此,它可以很简单地修复:
watcher.add("A");
watcher
.asObservable()
.subscribeOn(testScheduler)
.subscribe(subscriber);
// "go on, do your duty", as great Stannis would say
testScheduler.triggerActions();
// ^ if it didn't happen, "B" would be pushed in the exact same moment as "A",
// and as a result "A" is lost.
watcher.add("B");
triggerActions
在TestScheduler
宇宙中移动世界一帧,在这种情况下,将"a"沿链向下冲。根据文件再次,这种方法:
触发任何尚未触发且计划在此计划程序的当前时间或之前触发。
直到现在,CCD_ 29与CCD_。香槟瓶塞像烟花一样熄灭(每个编码者都知道当世界恢复正常时会有一种如释重负的感觉):)