有一个Broadcaster,它接受字符串并将它们附加到StringBuilder中。
我想测试一下。
我必须使用Thread#sleep
等待,同时广播程序完成字符串的处理。我想删除sleep
。
我尝试使用Control#debug()
,但没有成功。
public class BroadcasterUnitTest {
@Test
public void test() {
//prepare
Environment.initialize();
Broadcaster<String> sink = Broadcaster.create(Environment.newDispatcher()); //run broadcaster in separate thread (dispatcher)
StringBuilder sb = new StringBuilder();
sink
.observe(s -> sleep(100)) //long-time operation
.consume(sb::append);
//do
sink.onNext("a");
sink.onNext("b");
//assert
sleep(500);//wait while broadcaster finished (if comment this line then the test will fail)
assertEquals("ab", sb.toString());
}
private void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
我不熟悉Broadcaster
(由于这个问题很老,可能已经不推荐使用了),但以下3种方法通常会有所帮助:
-
当测试
Project-Reactor
的Flux
es和其他东西时,您可能更适合使用他们专门为此制作的测试库。他们的参考资料和Javadoc在这方面都很好,我将在这里复制一个不言自明的例子:@Test public void testAppendBoomError() { Flux<String> source = Flux.just("foo", "bar"); StepVerifier.create( appendBoomError(source)) .expectNext("foo") .expectNext("bar") .expectErrorMessage("boom") .verify(); }
-
您可以在
Flux
es和Mono
s上自己block()
,然后运行检查。请注意,如果发出错误,将导致异常。但有一种感觉,在某些情况下,您会发现自己需要编写更多的代码(例如,检查Flux
是否发出了两个项目X
&Y
,然后以错误终止),然后您将重新实现StepVerifier
。@Test public void testFluxOrMono() { Flux<String> source = Flux.just(2, 3); List<Integer> result = source .flatMap(i -> multiplyBy2Async(i)) .collectList() .block(); // run your asserts on the list. Reminder: the order may not be what you expect because of the `flatMap` // Or with a Mono: Integer resultOfMono = Mono.just(5) .flatMap(i -> multiplyBy2Async(i)) .map(i -> i * 4) .block(); // run your asserts on the integer }
-
您可以使用通用的异步测试解决方案,如CountDownLatch,但同样不推荐使用,在某些情况下会给您带来麻烦。例如,如果你事先不知道接收器的数量,你就需要使用其他东西。
根据上面的回答,我发现blockLast()
有帮助。
@Test
public void MyTest()
{
Logs.Info("Start test");
/* 1 */
// Make a request
WebRequest wr1 = new WebRequest("1", "2", "3", "4");
String json1 = wr1.toJson(wr1);
Logs.Info("Flux");
Flux<String> responses = controller.getResponses(json1);
/* 2 */
Logs.Info("Responses in");
responses.subscribe(s -> mySub.myMethod(s)); // Test for strings is in myMethod
Logs.Info("Test thread sleeping");
Thread.sleep(2000);
/* 3 */
Logs.Info("Test thread blocking");
responses.blockLast();
Logs.Info("Finish test");
}