2017-01-25 3 views
1

Я попробовал ниже, чтобы проверить sereialize().RxJava 2.x: serialize() не работает

Я позвонил onNext 1,000,000 раз, чтобы сосчитать 2 разных темы. Тогда я ожидал получить 2 000 000 в onComplete.

Однако я не мог получить ожидаемое значение.

private static int count = 0; 

private static void setCount(int value) { 
    count = value; 
} 

private static final int TEST_LOOP = 10; 

private static final int NEXT_LOOP = 1_000_000; 

@Test 
public void test() throws Exception { 

    for (int test = 0; test < TEST_LOOP; test++) { 
    Flowable.create(emitter -> { 
     ExecutorService service = Executors.newCachedThreadPool(); 
     emitter.setCancellable(() -> service.shutdown()); 

     Future<Boolean> future1 = service.submit(() -> { 
     for (int i = 0; i < NEXT_LOOP; i++) { 
      emitter.onNext(i); 
     } 
     return true; 
     }); 

     Future<Boolean> future2 = service.submit(() -> { 
     for (int i = 0; i < NEXT_LOOP; i++) { 
      emitter.onNext(i); 
     } 
     return true; 
     }); 

     if (future1.get(1, TimeUnit.SECONDS) 
      && future2.get(1, TimeUnit.SECONDS)) { 
     emitter.onComplete(); 
     } 
    }, BackpressureStrategy.BUFFER) 
     .serialize() 
     .cast(Integer.class) 
     .subscribe(new Subscriber<Integer>() { 

      private int count = 0; 

      @Override 
      public void onSubscribe(Subscription s) { 
      s.request(Long.MAX_VALUE); 
      } 

      @Override 
      public void onNext(Integer t) { 
      count++; 
      } 

      @Override 
      public void onError(Throwable t) { 
      fail(t.getMessage()); 
      } 

      @Override 
      public void onComplete() { 
      setCount(count); 
      } 
     }); 

    assertThat(count, is(NEXT_LOOP * 2)); 
    } 
} 

Интересно ли serialize() не работает или я missunderstood использование serialize()

Я проверил источник SerializedSubscriber.

@Override 
public void onNext(T t) { 
    ... 
    synchronized(this){ 
    ... 
    } 
    actual.onNext(t); 
    emitLoop(); 
} 

Поскольку actual.onNext(t); называют из синхронизированного блока, я полагаю, что actual.onNext(t); может быть вызван из разных потоков одновременно. Кроме того, можно позвонить по телефону onComplete, прежде чем onNext будет сделано, я думаю.

Я использовал RxJava 2.0.4.

+0

Вы должны подать сообщение об ошибке с проектом: https://github.com/ReactiveX/RxJava/issues/new – spierce7

+0

@ spierce7 Прежде чем я сделаю эту проблему, я хочу убедиться, что это ошибка, поскольку есть возможность мой misundaerstanding.of использования. Я не очень хорошо владею английским языком. – arching

ответ

1

Это не ошибка, но misuse из FlowableEmitter:

onNext, OnError и OnComplete методы должны быть вызваны в последовательном порядке, так же, как методы Абонента. Используйте serialize(), если вы хотите это сделать. Другие методы являются потокобезопасными.

FlowableEmitter.serialize()

Применение Flowable.serialize() слишком поздно для оператора create.