推进 RxJava 的 TestScheduler 以阻止调用 delay()



使用RxJava2,我有一个将反应世界与非反应世界联系起来的类。在此类中,有一个重写的方法,该方法在可能较长的处理后返回一些值。这种处理使用Single.create(emitter -> ...)嵌入到响应式世界中

处理时间对我来说至关重要,因为业务逻辑存在差异,无论第二个订阅者是在处理运行时还是在处理完成(或取消(之后出现。

现在我正在测试这个类 - 因此,我需要很长的处理时间只是虚拟的。RxJava的TestScheduler来拯救!

要为模拟延迟的桥接基类创建测试实现,我需要在 TestScheduler 上进行阻塞 (!( 调用,其中时间前进在单独的线程上触发。(否则永远不会触发时间前进。

我的问题是,我需要至少在"昂贵的计算"阻塞之前延迟时间前进 - 否则在延迟运算符激活之前时间会发生变化,因此,它会等到另一个永远不会发生的前进。

我可以通过在呼叫之前呼叫Thread.sleep(100)来及时前进来解决这个问题 - 但这对我来说似乎有点丑陋......等多久?对于多个测试,时间加起来,但由于时间问题而失败......呸。

知道如何以希望更干净的方式测试这种情况吗?谢谢!

import io.reactivex.Completable;
import io.reactivex.Single;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.schedulers.TestScheduler;
import java.util.concurrent.TimeUnit;
public class TestSchedulerBlocking {
public static void main(String[] args) throws Exception {
TestScheduler testScheduler = new TestScheduler();
// the task - in reality, this is a task with a overridden method returning some value, where I need to delay the return in a test-implementation.
Single<String> single = Single.create(emitter -> {
System.out.println("emitting on " + Thread.currentThread() + " ...");
// This is the core of my problem: I need some way to test a (synchronous) delay before the next line executes
// (e.g. method returns, or, in another case, an exception is thrown).
// Thread.sleep(<delay>) would be straight forward, but I want to use TestScheduler's magic time-advancing in my tests...
// Using the usual non-blocking methods of RX, everything works fine for testing using the testScheduler, but here it doesn't help.
// Here I need to synchronize it somehow, that the advancing in time is done *after* this call is blocking.
// I tried a CountDownLatch in doOnSubscribe, but apparently, that's executed too early for me - I'd need doAfterSubscribe or so...
// Final word on architecture: I'm dealing with the bridging of the reactive to the non-reactive world, so I can't just change the architecture.
Completable.complete()
.delay(3, TimeUnit.SECONDS, testScheduler)
.doOnSubscribe(__ -> System.out.println("onSubcribe: completable"))
.blockingAwait();
System.out.println("<< blocking released >>");
// this has to be delayed! Might be, for example, a return or an exception in real world.
emitter.onSuccess("FooBar!");
});

System.out.println("running the task ...");
TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println("onSubcribe: single"))
.subscribeOn(Schedulers.newThread())
.test();
// Without this sleep, the advancing in testScheduler's time is done *before* the doIt() is executed,
// and therefore the blockingAwait() blocks until infinity...
System.out.println("---SLEEPING---");
Thread.sleep(100);
// virtually advance the blocking delay 
System.out.println("---advancing time---");
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
// wait for the task to complete
System.out.println("await...");
testObserver.await();
// assertions on task results, etc.
System.out.println("finished. now do some testing... values (expected [FooBar!]): " + testObserver.values());
}
}

[注意:我昨天以更复杂和冗长的方式问了这个问题 - 但由于我的办公室太热了,它有几个缺陷和错误。因此,我删除了它,现在它更浓缩为真正的问题。

如果我正确理解您的问题,您有一个长时间运行的服务,并且您希望能够测试将受长期运行服务影响的服务的订阅。我通过以下方式处理这个问题。

按现有方式定义服务,使用TestScheduler来控制其时间进度。然后使用相同的测试调度程序定义测试刺激。

这将模拟阻止实体。不要使用"阻止"。

Single<String> single = Observable.timer( 3, TimeUnit.SECONDS, scheduler )
.doOnNext( _l -> {} )
.map( _l -> "FooBar!" )
.take( 1 )
.toSingle();
// set up asynchronous operations
Observable.timer( 1_500, TimeUnit.MILLISECONDS, scheduler )
.subscribe( _l -> performTestAction1() );
Observable.timer( 1_900, TimeUnit.MILLISECONDS, scheduler )
.subscribe( _l -> performTestAction2() );
Observable.timer( 3_100, TimeUnit.MILLISECONDS, scheduler )
.subscribe( _l -> performTestAction3() );
System.out.println("running the task ...");
TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println("onSubcribe: single"))
.subscribeOn(Schedulers.newThread())
.test();
// virtually advance the blocking delay 
System.out.println("---advancing time---");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);

您应该看到发生performTestAction1(),然后performTestAction2(),然后single完成,然后performTestAction3()。所有这些都是由测试调度程序驱动的。

相关内容

  • 没有找到相关文章

最新更新