TestScheduler不能在RxJava上工作



我试图测试一个函数,其中流的元素在延迟后一个接一个地调度,我能够使用Thread.sleep获得我的测试工作。然而,当我使用TestScheduler.advanceTimeBy时,我不能得到任何结果。

查看代码:

public Observable<Object> getDelayedObjects(Observable<Observable<Object>> objectsStreams) {
    objectsStreams.concatMap(objectsStream ->
        objectsStream.repeat().concatMap(object ->
            Observable.just(object)
                      .delay(getDuration(object), TimeUnit.MILLISECONDS)));
}

和测试代码:

TestScheduler testScheduler = new TestScheduler();
BehaviorSubject<Observable<Object>> objectStreamSubject = BehaviorSubject.create(objectsStream);
model.getDelayedObjects(objectStreamSubject)
        .observeOn(testScheduler)
        .subscribeOn(testScheduler)
        .subscribe(testSubscriber);
testScheduler.triggerActions();
//Thread.sleep(900) works with the default scheduler
testScheduler.advanceTimeBy(900, TimeUnit.MILLISECONDS);
testSubscriber.assertReceivedOnNext(objects);

更新:

检查TestScheduler的使用情况,我发现通常将调度器传递给delay函数。因此,我能够通过提供调度器作为方法getDelayedObjectsdelay的参数来通过测试。然而,我仍然不明白为什么它之前不工作。

默认情况下,delay操作符使用计算调度器执行基于时间的延迟。这些信息可以在该方法的文档中找到。查找@SchedulerSupport注释中的值,在本例中为io.reactivex:computation

出于测试目的,您将不得不用TestScheduler替换计算调度程序。为了能够进行替换,您必须使用delay操作符的许多重写之一,该操作符接受Scheduler

相关内容

  • 没有找到相关文章

最新更新