1

Существует вещатель, который принимает строки и присоединяет их к StringBuilder.Project Reactor: подождите, пока закончится вещатель

Я хочу протестировать его.

Я должен использовать Thread#sleep, чтобы подождать, в то время как вещание завершает обработку строк. Я хочу удалить sleep.

Я попытался использовать Control#debug() безуспешно.

public class BroadcasterUnitTest { 

@Test 
public void test() { 
    //prepare 
    Environment.initialize(); 
    Broadcaster<String> sink = Broadcaster.create(Environment.newDispatcher()); //run broadcaster in separate thread (dispatcher) 
    StringBuilder sb = new StringBuilder(); 
    sink 
      .observe(s -> sleep(100)) //long-time operation 
      .consume(sb::append); 

    //do 
    sink.onNext("a"); 
    sink.onNext("b"); 

    //assert 
    sleep(500);//wait while broadcaster finished (if comment this line then the test will fail) 
    assertEquals("ab", sb.toString()); 
} 

private void sleep(int millis) { 
    try { 
     Thread.sleep(millis); 
    } catch (InterruptedException e) { 
     throw new RuntimeException(e); 
    } 
} 
} 

ответ

0

Я не знаком с Broadcaster (и это, вероятно, не рекомендуется, так как вопрос старый), но эти 3 способа могут быть полезны в целом:

  1. При тестировании Project-Reactor «s Flux эс и т. д., вам, вероятно, лучше использовать свою тестовую библиотеку, созданную специально для этого. Their reference и Javadoc на этой части довольно хорошо, и я просто скопировать пример, который говорит сам за себя здесь:

    @Test 
    public void testAppendBoomError() { 
        Flux<String> source = Flux.just("foo", "bar"); 
        StepVerifier.create( 
        appendBoomError(source)) 
        .expectNext("foo") 
        .expectNext("bar") 
        .expectErrorMessage("boom") 
        .verify(); 
    } 
    
  2. Вы можете просто block() сами по Flux эс и Mono с, а затем запустить проверку , Обратите внимание, что если ошибка испускается, это приведет к исключению. Но имейте в виду, что вам понадобится написать больше кода для некоторых случаев (например, проверка Flux испустила 2 элемента: X & Y, а затем завершена с ошибкой), и вы затем будете повторно внедрять StepVerifier.

    @Test 
    public void testFluxOrMono() { 
        Flux<String> source = Flux.just(2, 3); 
        List<Integer> result = source 
         .flatMap(i -> multiplyBy2Async(i)) 
         .collectList() 
         .block(); 
        // run your asserts on the list. Reminder: the order may not be what you expect because of the `flatMap` 
        // Or with a Mono: 
        Integer resultOfMono = Mono.just(5) 
         .flatMap(i -> multiplyBy2Async(i)) 
         .map(i -> i * 4) 
         .block(); 
        // run your asserts on the integer 
    } 
    
  3. Вы можете использовать общие решения для ASync тестирования как CountDownLatch, но, опять же, не рекомендовал бы и даст вам неприятности в некоторых случаях. Например, если вы заранее не знаете количество приемников, вам нужно будет использовать что-то еще.