При написании задания синхронизации данных с RxJava я обнаружил странное поведение, которое я не могу объяснить. Я довольно новичок в RxJava и буду признателен за помощь.RxJava flatMap и противодавление странное поведение
Вкратце моя работа довольно проста. У меня есть список идентификаторов элементов, я вызываю веб-сервис, чтобы получить каждый элемент по ID, выполнить некоторую обработку и выполнить множественный вызов для передачи данных в БД. Загрузка данных происходит быстрее, чем хранение данных, поэтому я обнаружил ошибки OutOfMemory.
Мой код в значительной степени похожи на «провал» тест, но затем doning некоторое испытание, я понял, что удаление строки:
flatMap(dt -> Observable.just(dt))
Сделать работу. Неудачный тестовый результат показывает, что непонятые элементы складываются, и это приводит к OutOfMemory. Рабочий тестовый результат показывает, что производитель всегда будет ждать пользователя, поэтому это никогда не приведет к OutOfMemory.
public static class DataStore {
public Integer myVal;
public byte[] myBigData;
public DataStore(Integer myVal) {
this.myVal = myVal;
this.myBigData = new byte[1000000];
}
}
@Test
public void working() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;
AtomicInteger nbUnconsumed = new AtomicInteger(0);
List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
.toBlocking().forEach(s -> {});
logger.info("Finished");
}
@Test
public void failing() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;
AtomicInteger nbUnconsumed = new AtomicInteger(0);
List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
.flatMap(dt -> Observable.just(dt))
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
.toBlocking().forEach(s -> {});
logger.info("Finished");
}
private Observable<DataStore> produce(final int value) {
return Observable.<DataStore>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(200); //Here I synchronous call WS to retrieve data
s.onNext(new DataStore(value));
s.onCompleted();
}
} catch (Exception e) {
s.onError(e);
}
}).subscribeOn(Schedulers.io());
}
private Observable<Boolean> consume(DataStore value) {
return Observable.<Boolean>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(1000); //Here I synchronous call DB to store data
s.onNext(true);
s.onCompleted();
}
} catch (Exception e) {
s.onNext(false);
s.onCompleted();
}
}).subscribeOn(Schedulers.io());
}
Что объясняет такое поведение? Как я могу решить свой неудачный тест, не удаляя Observable.just (dt)), который в моем реальном случае является Observable.from (someListOfItme)
Хорошо, я начинаю понимать. Поэтому я должен заменить его flatMap (dt -> Observable.just (dt), SOME_PRODUCER_BUFFER_SIZE)? –
Да, желательно, MAX_CONCURRENT_LOAD – akarnokd
Да, но я думаю, что в моем случае мне было бы интересно, чтобы буфер производителя был больше, чем максимальная одновременная загрузка, поэтому, если иногда хранение быстрее, у меня есть буфер, чтобы каждый поток был занят, иначе потребитель будет Подождите. Это правильно? –