如何在调用代码输入和基于定时器的输入之间交替,并使用反应式编程对其进行单元测试



要求

我正在尝试实现一个类,该类计算操作完成的估计剩余时间。我想将解决方案建立在RxJava的基础上——让类将更新后的估计值公开为Observable。

为了解决这个问题,我把我的解决方案精简到了最低限度,放弃了计算和所有其他的细节。

剩下的核心思想是,我们正在听取进度更新,但如果有一段时间没有进展(想象一下大量文件被复制,应用程序被一个特别大的文件"卡住"),我希望在等待来自实际来源的新信息的同时,定期发出更新。

(显然,在所描述的示例场景中,使用代码会观察到在处理文件时估计值越来越差)。

我还想对解决方案进行单元测试。

简化、可编译的实现(不起作用)

这是我的Watcher类(如果是这样的话,显然我不需要两个主题,但它反映了原始的、更复杂的实现的结构):

public class Watcher {
    public List<String> buffer = new ArrayList<>();
    public BehaviorSubject<String> retriever = BehaviorSubject.create();
    public BehaviorSubject<String> publisher = BehaviorSubject.create();
    public Watcher() {
        retriever
                // when no data is coming, start the timer
                .switchMap(
                        new Func1<String, Observable<String>>() {
                            @Override
                            public Observable<String> call(String s) {
                                return Observable
                                        .interval(1, 1, TimeUnit.SECONDS)
                                        .map(new Func1<Long, String>() {
                                            @Override
                                            public String call(Long tick) {
                                                return "tick " + tick;
                                            }
                                        });
                            }
                        })
                .doOnNext(
                        new Action1<String>() {
                            @Override
                            public void call(String str) {
                                buffer.add(str);
                            }
                        }
                )
                .subscribe(publisher);
    }
    public void add(String str) {
        retriever.onNext(str);
    }
    public Observable<String> asObservable() {
        return publisher.asObservable();
    }
}

测试代码(失败)

以下是使用TestScheduler模拟时间流的测试:

TestScheduler testScheduler = new TestScheduler();
TestSubscriber subscriber = new TestSubscriber();
Watcher watcher = new Watcher();
// adding this before the subscription occurs
// i'd also like to ensure that the observer "catches up" on whatever it missed
// - that's why I used PublishSubjects 
watcher.add("A");
watcher
        .asObservable()
        // this bit should get the whole thing running on TestScheduler
        // so that it reacts to artificial shifts of time
        .subscribeOn(testScheduler)
        .subscribe(subscriber);
watcher.add("B");
// this should get the timer going...
testScheduler.advanceTimeBy(5, TimeUnit.SECONDS);
// and here I expect it to get disabled
watcher.add("C");
// over to the ticking timer
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);

预期结果与实际结果

我的预期结果将接收:A,B,几次,C,几次

我得到的都是滴答声!看不到A、B或C。既不在testScheduler中,也不在Watcher自己的缓冲区中。

备注

有趣的是,实际的实现遇到了相反的问题:我得到的是实际的输入,而缺少的是记号。

所以很明显,我没有在我的简化版本中准确地重新创建它。然而,这个错误似乎具有类似的性质:由于某种原因,在两个来源之间切换并没有像我想象的那样起作用。

问题

我做错了什么?我是否误解了switchMap的工作原理?我是否滥用TestScheduler及其虚拟时间安排?

除了我可能犯的任何错误之外,有没有更好、更地道的替代方法可以完成它?

@Konrad,感谢您提出的详细问题和答案。我不太确定我是否完全理解你的要求,但在我看来startWith应该胜任这项工作。

此外,如果您询问"Rx-idomatic",从Transformer派生Watcher是一种很好的做法。这样可以更容易地在Rx链中重用您的方法。

这是一个完整的代码,它实现了我相信您想要实现的目标。它是Java 8,编写Rx代码更符合人体工程学。

package com.reactive;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.TestScheduler;
import rx.subjects.BehaviorSubject;
import sun.jvm.hotspot.utilities.Assert;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

// According to https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators
// you should use transformers to implement your own observables
class Watcher implements Observable.Transformer<String,String> {
    Scheduler _scheduler;
    // As you already realized in your answer you need to specify the scheduler if you want to control the interval observable
    public Watcher(Scheduler scheduler) {
       _scheduler = scheduler;
    }
    @Override
    public Observable<String> call(Observable<String> retriever) {
        return retriever.switchMap(s ->
                // Create the sequence of ticks
            Observable.interval(1,TimeUnit.SECONDS,_scheduler)
            .map(tick -> "tick " + tick)
            // but prepend the actual signal from the retriever
            .startWith(s)
            );
    }
}

public class Main {
    public static void main(String[] args) {
        TestScheduler testScheduler = new TestScheduler();
        BehaviorSubject<String> retriever = BehaviorSubject.create();
        ArrayList<String> results = new ArrayList<>();
        retriever.compose(new Watcher(testScheduler))
                .subscribe(s->results.add(s));
        retriever.onNext("A");
        retriever.onNext("B");
        testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
        retriever.onNext("C");
        testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
        String result = String.join(" , ",results);
        Assert.that(result.equals("A , B , tick 0 , tick 1 , tick 2 , C , tick 0 , tick 1"),result);
    }
}

没有答案伤害了我的感情,但我咬紧牙关,继续努力,最终使它按预期工作。

我不确定我的实现是惯用的还是正确的(不是狭义的得出正确的结果,它确实做到了——但如果它不有力/不熟练,我不会发誓)。

我花了一些时间来阐述它,虽然如果这对任何人都有帮助,我会很高兴,但我还不是一个专家,如果有人提供他们的见解,我会非常感激,如果他们不同意,可能会纠正我的一些假设。

我通过三个基本步骤解决了这个问题。

1

我现在手动将testScheduler(总是使用虚拟时间调度)传递到嵌套的Observable.interval中。

因此,TestScheduler testScheduler = new TestScheduler();现在被转换为构造函数参数,

Observable.interval(1, 1, TimeUnit.SECONDS)

被取代

Observable.interval(1, 1, TimeUnit.SECONDS, testScheduler)

否则(显然)Observable.interval将继续在其自己的默认调度程序上运行,即使在我调试测试并且它在断点上停止时,它也会继续向缓冲区添加"ticks"。

注意,我已经使用了subscribeOn来尝试将整个东西放在testScheduler:上

watcher
    .asObservable()
    .subscribeOn(testScheduler)

关键是subscribeOn不会影响嵌套的可观察性:它们仍然会在默认的Scheduler上运行,您必须小心手动覆盖它如果我误解了这一点,请原谅你的想法

2

此位:

.switchMap(
        new Func1<String, Observable<String>>() {
            @Override
            public Observable<String> call(String s) {
                return Observable
                        .interval(1, 1, TimeUnit.SECONDS, testScheduler)
                        .map(new Func1<Long, String>() {
                            @Override
                            public String call(Long tick) {
                                return "tick " + tick;
                            }
                        });
            }
        })

仍然会让我在缓冲区中只剩下"记号"。"A"、"B"、"C"不见了(但是,在我预计A、B、C会出现的地方,勾号计数器会重置回0)。

为了解决这个问题,我不得不像这样加强实施:

.switchMap(
        new Func1<String, Observable<String>>() {
            @Override
            public Observable<String> call(String s) {
                return Observable
                        // this preserves the original item 
                        // that caused the switch (the A, B or C)
                        .just(s)
                        // and here we say we want it followed by ticks
                        .concatWith(
                                Observable.interval(1, 1, TimeUnit.SECONDS, scheduler)
                                // <snip>

它现在确实有效,但它已经嵌套得很深了。RxJava的主要卖点之一是有助于避免深度嵌套,并使数据流上的操作变得平坦那么,还有更好、更平坦的替代方案吗?我做错了吗这个问题仍然存在。

3

此时,watcher.buffer已经包含"A,B,tick 0,tick 1,tick 2,tick 3,tick 4,C,tick O,tick l 1"。我看到这很好。

然而,我的TestSubscriber subscriber仍然缺少第一个"A"(列表subscriber.getOnNextEvents()以"B"开头)。

的确,插入"A"发生在订阅之前:

watcher.add("A");
watcher
    .asObservable()
    .subscribeOn(testScheduler)
    .subscribe(subscriber);
watcher.add("B");

然而,正如我的问题中的评论所解释的,这是经过深思熟虑的,因为Watcher.publisher(我们订阅的)是BehaviorSubject实例,因此对它来说应该不是问题。

根据文件,BehaviorSubject

发出其观察到的最新项目的主题每个订阅的观察者的后续观察项目

因此,它应该将它能回忆起的最新项目传递给任何新订阅的Observer。我知道"A"一定穿过了可观测到的,因为它存在于watcher.buffer中。为什么publisher不能通过首先传递缓存的"A"来对subscribe作出反应?

我花了一段时间才明白。

答案似乎是:随着时间虚拟化,我们必须一路走下去。如果我们不手动按下时钟,时间就会完全冻结,根本不会执行任何操作。

因此,它可以很简单地修复:

watcher.add("A");
watcher
    .asObservable()
    .subscribeOn(testScheduler)
    .subscribe(subscriber);
// "go on, do your duty", as great Stannis would say
testScheduler.triggerActions();
// ^ if it didn't happen, "B" would be pushed in the exact same moment as "A",
// and as a result "A" is lost.
watcher.add("B");

triggerActionsTestScheduler宇宙中移动世界一帧,在这种情况下,将"a"沿链向下冲。根据文件再次,这种方法:

触发任何尚未触发且计划在此计划程序的当前时间或之前触发。

直到现在,CCD_ 29与CCD_。香槟瓶塞像烟花一样熄灭(每个编码者都知道当世界恢复正常时会有一种如释重负的感觉):)

最新更新