对observable的客户端进行单元测试



我有以下方法go()我想测试:

private Pair<String, String> mPair;
public void go() {
    Observable.zip(
            mApi.webCall(),
            mApi.webCall2(),
            new Func2<String, String, Pair<String, String>>() {
                @Override
                public Pair<String, String> call(String s, String s2) {
                    return new Pair(s, s2);
                }
            }
    )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Pair<String, String>>() {
                @Override
                public void call(Pair<String, String> pair) {
                    mApi.webCall3(pair.first, pair.second);
                }
            });
}

该方法使用Observable.zip()异步执行http请求,并将它们合并在一个Pair中。最后,使用前面这些请求的结果执行另一个http请求。

我想验证调用go()方法是否会发出webCall()webCall2()请求,然后是webCall3(String, String)请求。因此,我希望通过以下测试(使用Mockito监视Api):

@Test
public void testGo() {
    /* Given */
    Api api = spy(new Api() {
        @Override
        public Observable<String> webCall() {
            return Observable.just("First");
        }
        @Override
        public Observable<String> webCall2() {
            return Observable.just("second");
        }
        @Override
        public void webCall3() {
        }
    });
    Test test = new Test(api);
    /* When */
    test.go();
    /* Then */
    verify(api).webCall();
    verify(api).webCall2();
    verify(api).webCall3("First", "second");
}

然而,当运行这个时,web调用是异步执行的,并且我的测试在订阅者完成之前执行断言,导致我的测试失败。

我读到您可以使用RxJavaSchedulersHookRxAndroidSchedulersHook为所有方法返回Schedulers.immediate(),但这会导致测试无限期运行。

我正在本地JVM上运行单元测试。

我如何才能实现这一点,最好不必修改go()的签名?

(lambda thanks to retrolambda)

对于初学者,我会将go重新表述为:

private Pair<String, String> mPair;
public Observable<Pair<String, String>> go() {
    return Observable.zip(
                mApi.webCall(),
                mApi.webCall2(),
                (String s, String s2) -> new Pair(s, s2)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(pair -> mPair = pair);
}
public Pair<String, String> getPair() {
    return mPair;
}

doOnNext允许在有人订阅Observable

时拦截链中正在处理的值。

然后,我将像这样调用测试:

Pair result = test.go().toBlocking().lastOrDefault(null);

然后你可以测试result是什么

我会在您的测试中使用TestSchedulerTestSubscriber。为了使用它,您必须接收组成zip的可观察对象,以便您可以使用testScheduler订阅该工作。此外,还必须对调度程序进行参数化。您不必更改go方法的签名,但必须在底层功能中参数化调度器。您可以通过构造函数注入调度器,通过继承覆盖protected字段,或者调用package保护的重载。我在编写示例时假设了一个接受调度器作为参数并返回Observable的重载。

TestScheduler为您提供了一种以可预测的可重复的方式触发异步操作符行为的同步方法。TestSubscriber为您提供了一种等待终止并断言接收到的值和信号的方法。此外,您可能还需要知道,默认情况下,delay(long, TimeUnit)操作符在计算调度器上调度。您还需要在那里使用testScheduler。

Scheduler ioScheduler = Schedulers.io();
Scheduler mainThreadScheduler =  AndroidSchedulers.mainThread();
public void go() {
    go(subscribeOnScheduler, mainThreadScheduler).toBlocking().single();
}
/*package*/ Observable<Pair<String, String>> go(Scheduler ioScheduler, Scheduler mainThreadScheduler) {
    return Observable.zip(
            mApi.webCall(),
            mApi.webCall2(),
            new Func2<String, String, Pair<String, String>>() {
                @Override
                public Pair<String, String> call(String s, String s2) {
                    return new Pair(s, s2);
                }
            })
            .doOnNext(new Action1<Pair<String, String>>() {
                @Override
                public void call(Pair<String, String>() {
                    mApi.webCall3(pair.first, pair.second);
                })
            })
            .subscribeOn(ioScheduler)
            .observeOn(mainThreadScheduler);
}

测试代码

@Test
public void testGo() {
    /* Given */
    TestScheduler testScheduler = new TestScheduler();
    Api api = spy(new Api() {
        @Override
        public Observable<String> webCall() {
            return Observable.just("First").delay(1, TimeUnit.SECONDS, testScheduler);
        }
        @Override
        public Observable<String> webCall2() {
            return Observable.just("second");
        }
        @Override
        public void webCall3() {
        }
    });
    Test test = new Test(api);
    /* When */
    test.go(testScheduler, testScheduler).subscribe(subscriber);
    testScheduler.triggerActions();
    subscriber.awaitTerminalEvent();
    /* Then */
    verify(api).webCall();
    verify(api).webCall2();
    verify(api).webCall3("First", "second");
}

我已经发现,我可以检索我的Schedulers在一个非静态的方式,基本上注入到我的客户端类。SchedulerProvider替换了对Schedulers.x()的静态调用:

public interface SchedulerProvider {
    Scheduler io();
    Scheduler mainThread();
}

生产实现委托回Schedulers:

public class SchedulerProviderImpl implements SchedulerProvider {
    public static final SchedulerProvider INSTANCE = new SchedulerProviderImpl();
    @Override
    public Scheduler io() {
        return Schedulers.io();
    }
    @Override
    public Scheduler mainThread() {
        return AndroidSchedulers.mainThread();
    }
}

但是,在测试期间,我可以创建一个TestSchedulerProvider:

public class TestSchedulerProvider implements SchedulerProvider {
    private final TestScheduler mIOScheduler = new TestScheduler();
    private final TestScheduler mMainThreadScheduler = new TestScheduler();
    @Override
    public TestScheduler io() {
        return mIOScheduler;
    }
    @Override
    public TestScheduler mainThread() {
        return mMainThreadScheduler;
    }
}

现在我可以将SchedulerProvider注入到包含go()方法的Test类中:

class Test {
    /* ... */
    Test(Api api, SchedulerProvider schedulerProvider) {
        mApi = api;
        mSchedulerProvider = schedulerProvider;
    }
    void go() {
        Observable.zip(
            mApi.webCall(),
            mApi.webCall2(),
            new Func2<String, String, Pair<String, String>>() {
                @Override
                public Pair<String, String> call(String s, String s2) {
                    return new Pair(s, s2);
                }
            }
      )
            .subscribeOn(mSchedulerProvider.io())
            .observeOn(mSchedulerProvider.mainThread())
            .subscribe(new Action1<Pair<String, String>>() {
                @Override
                public void call(Pair<String, String> pair) {
                    mApi.webCall3(pair.first, pair.second);
                }
            });
    }
}

测试如下:

@Test
public void testGo() {
    /* Given */
    TestSchedulerProvider testSchedulerProvider = new TestSchedulerProvider();
    Api api = spy(new Api() {
        @Override
        public Observable<String> webCall() {
            return Observable.just("First");
        }
        @Override
        public Observable<String> webCall2() {
            return Observable.just("second");
        }
        @Override
        public void webCall3() {
        }
    });
    Test test = new Test(api, testSchedulerProvider);
    /* When */
    test.go();
    testSchedulerProvider.io().triggerActions();
    testSchedulerProvider.mainThread().triggerActions();
    /* Then */
    verify(api).webCall();
    verify(api).webCall2();
    verify(api).webCall3("First", "second");
}

我有一个类似的问题,需要多走一步才能解决。:

existingObservable
     .zipWith(Observable.interval(100, TimeUnit.MILLISECONDS), new Func1<> ...)
     .subscribeOn(schedulersProvider.computation())

仍然没有使用所提供的TestScheduler schedulersProvider返回。为了工作,必须在我正在压缩的各个流上指定. subscribeon ()。:

existingObservable.subscribeOn(schedulersProvider.computation())
     .zipWith(Observable.interval(100, TimeUnit.MILLISECONDS).subscribeOn(schedulersProvider.computation()), new Func1<> ...)
     .subscribeOn(schedulersProvider.computation())

注意,schedulersProvider是一个模拟,返回我的测试的TestScheduler !

相关内容

  • 没有找到相关文章

最新更新