项目反应器:等待广播结束



有一个Broadcaster,它接受字符串并将它们附加到StringBuilder中。

我想测试一下。

我必须使用Thread#sleep等待,同时广播程序完成字符串的处理。我想删除sleep

我尝试使用Control#debug(),但没有成功。

public class BroadcasterUnitTest {
@Test
public void test() {
    //prepare
    Environment.initialize();
    Broadcaster<String> sink = Broadcaster.create(Environment.newDispatcher()); //run broadcaster in separate thread (dispatcher)
    StringBuilder sb = new StringBuilder();
    sink
            .observe(s -> sleep(100)) //long-time operation
            .consume(sb::append);
    //do
    sink.onNext("a");
    sink.onNext("b");
    //assert
    sleep(500);//wait while broadcaster finished (if comment this line then the test will fail)
    assertEquals("ab", sb.toString());
}
private void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}
}

我不熟悉Broadcaster(由于这个问题很老,可能已经不推荐使用了),但以下3种方法通常会有所帮助:

  1. 当测试Project-ReactorFluxes和其他东西时,您可能更适合使用他们专门为此制作的测试库。他们的参考资料和Javadoc在这方面都很好,我将在这里复制一个不言自明的例子:

    @Test
    public void testAppendBoomError() {
      Flux<String> source = Flux.just("foo", "bar"); 
      StepVerifier.create( 
        appendBoomError(source)) 
        .expectNext("foo") 
        .expectNext("bar")
        .expectErrorMessage("boom") 
        .verify(); 
    }
    
  2. 您可以在Fluxes和Monos上自己block(),然后运行检查。请注意,如果发出错误,将导致异常。但有一种感觉,在某些情况下,您会发现自己需要编写更多的代码(例如,检查Flux是否发出了两个项目X&Y,然后以错误终止),然后您将重新实现StepVerifier

    @Test
    public void testFluxOrMono() {
      Flux<String> source = Flux.just(2, 3);
      List<Integer> result = source
            .flatMap(i -> multiplyBy2Async(i))
            .collectList()
            .block();
      // run your asserts on the list. Reminder: the order may not be what you expect because of the `flatMap`
      // Or with a Mono:
      Integer resultOfMono = Mono.just(5)
            .flatMap(i -> multiplyBy2Async(i))
            .map(i -> i * 4)
            .block();
      // run your asserts on the integer
    }
    
  3. 您可以使用通用的异步测试解决方案,如CountDownLatch,但同样不推荐使用,在某些情况下会给您带来麻烦。例如,如果你事先不知道接收器的数量,你就需要使用其他东西。

根据上面的回答,我发现blockLast()有帮助。

@Test
public void MyTest()
{
    Logs.Info("Start test");    
 
    /* 1 */
    // Make a request
    WebRequest wr1 = new WebRequest("1", "2", "3", "4");
    String json1 = wr1.toJson(wr1);
    
    Logs.Info("Flux");
    Flux<String> responses = controller.getResponses(json1);
    /* 2 */
    Logs.Info("Responses in");
    responses.subscribe(s -> mySub.myMethod(s)); // Test for strings is in myMethod
    
    Logs.Info("Test thread sleeping");
    Thread.sleep(2000);
    
    /* 3 */
    Logs.Info("Test thread blocking");
    responses.blockLast();
    
    Logs.Info("Finish test");
}

最新更新