2016-02-10 4 views
3

При написании задания синхронизации данных с RxJava я обнаружил странное поведение, которое я не могу объяснить. Я довольно новичок в RxJava и буду признателен за помощь.RxJava flatMap и противодавление странное поведение

Вкратце моя работа довольно проста. У меня есть список идентификаторов элементов, я вызываю веб-сервис, чтобы получить каждый элемент по ID, выполнить некоторую обработку и выполнить множественный вызов для передачи данных в БД. Загрузка данных происходит быстрее, чем хранение данных, поэтому я обнаружил ошибки OutOfMemory.

Мой код в значительной степени похожи на «провал» тест, но затем doning некоторое испытание, я понял, что удаление строки:

flatMap(dt -> Observable.just(dt)) 

Сделать работу. Неудачный тестовый результат показывает, что непонятые элементы складываются, и это приводит к OutOfMemory. Рабочий тестовый результат показывает, что производитель всегда будет ждать пользователя, поэтому это никогда не приведет к OutOfMemory.

public static class DataStore { 
    public Integer myVal; 
    public byte[] myBigData; 

    public DataStore(Integer myVal) { 
     this.myVal = myVal; 
     this.myBigData = new byte[1000000]; 
    } 
} 

@Test 
public void working() { 
    int MAX_CONCURRENT_LOAD = 1; 
    int MAX_CONCURRENT_STORE = 2; 

    AtomicInteger nbUnconsumed = new AtomicInteger(0); 

    List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList()); 
    Observable.from(ids) 
     .flatMap(this::produce, MAX_CONCURRENT_LOAD) 
     .doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet())) 
     .flatMap(this::consume, MAX_CONCURRENT_STORE) 
     .doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet())) 
     .toBlocking().forEach(s -> {}); 

    logger.info("Finished"); 
} 

@Test 
public void failing() { 
    int MAX_CONCURRENT_LOAD = 1; 
    int MAX_CONCURRENT_STORE = 2; 

    AtomicInteger nbUnconsumed = new AtomicInteger(0); 

    List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList()); 
    Observable.from(ids) 
     .flatMap(this::produce, MAX_CONCURRENT_LOAD) 
     .doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet())) 
     .flatMap(dt -> Observable.just(dt)) 
     .flatMap(this::consume, MAX_CONCURRENT_STORE) 
     .doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet())) 
     .toBlocking().forEach(s -> {}); 

    logger.info("Finished"); 
} 

private Observable<DataStore> produce(final int value) { 
    return Observable.<DataStore>create(s -> { 
     try { 
      if (!s.isUnsubscribed()) { 
       Thread.sleep(200); //Here I synchronous call WS to retrieve data 
       s.onNext(new DataStore(value)); 
       s.onCompleted(); 
      } 
     } catch (Exception e) { 
      s.onError(e); 
     } 
    }).subscribeOn(Schedulers.io()); 
} 

private Observable<Boolean> consume(DataStore value) { 
    return Observable.<Boolean>create(s -> { 
     try { 
      if (!s.isUnsubscribed()) { 
       Thread.sleep(1000); //Here I synchronous call DB to store data 
       s.onNext(true); 
       s.onCompleted(); 
      } 
     } catch (Exception e) { 
      s.onNext(false); 
      s.onCompleted(); 
     } 
    }).subscribeOn(Schedulers.io()); 
} 

Что объясняет такое поведение? Как я могу решить свой неудачный тест, не удаляя Observable.just (dt)), который в моем реальном случае является Observable.from (someListOfItme)

ответ

5

flatMap по умолчанию объединяет неограниченное количество источников и применяет эту конкретную лямбда без maxConcurrent, вы по существу не ограничены восходящим потоком, который теперь может работать на полной скорости, подавляя внутренние буферы других операторов.

+0

Хорошо, я начинаю понимать. Поэтому я должен заменить его flatMap (dt -> Observable.just (dt), SOME_PRODUCER_BUFFER_SIZE)? –

+0

Да, желательно, MAX_CONCURRENT_LOAD – akarnokd

+0

Да, но я думаю, что в моем случае мне было бы интересно, чтобы буфер производителя был больше, чем максимальная одновременная загрузка, поэтому, если иногда хранение быстрее, у меня есть буфер, чтобы каждый поток был занят, иначе потребитель будет Подождите. Это правильно? –

 Смежные вопросы

  • Нет связанных вопросов^_^