Я относительно новичок в RxJava2, и у меня возникают некоторые странные поведения, поэтому вполне вероятно, что я использую инструмент не по-другому.RxJava2 flatMap создает повторяющиеся события
Это довольно большой проект, но я отделил фрагмент кода ниже как минимум воспроизводимым код:
Observable
.interval(333, TimeUnit.MILLISECONDS)
.flatMap(new Function<Long, ObservableSource<Integer>>() {
private Subject<Integer> s = PublishSubject.create();
private int val = 0;
@Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
val++;
s.onNext(val);
return s;
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.w("value: %s", integer);
}
});
Этот код имитирует события из моей RX-потока с использованием .interval
и flatMap
получать эти события " выполните некоторую обработку »и использует Subject
для вывода результатов вниз по потоку.
Поток - это непрерывный процесс, который будет иметь несколько событий.
Этот минимальный код глупый, потому что я нажимаю только на обратный вызов apply
, но в реальном случае есть несколько возможных моментов, когда может произойти нажатие, а число событий, принимаемых во время apply
, - это не то же количество который будет отправлен через Subject.
То, что я ожидал увидеть этот код:
value: 2 // 1 got skipped because onNext is called before there's a subscriber.
value: 3
value: 4
value: 5
value: 6 ... etc
, что я на самом деле получил это:
value: 2
value: 3
value: 3 // 3 twice
value: 4
value: 4
value: 4 // 4 repeated 3 times
value: 5
value: 5
value: 5
value: 5 // 5 repeated 4 times
value: 6
value: 6
value: 6
value: 6
value: 6 // 6 repeated 5 times
... etc
Я также попытался иметь Observable<Integer> o = s.share();
и вернуть его, или вернуться непосредственно s.share();
с теми же результатами.
Я понимаю, почему это происходит. ObservableSource
снова подписались на n снова n, так что в каждом цикле больше событий.
Вопрос:
Как я могу достигнуть моего ожидаемого поведения?
(в случае, если мое ожидаемое поведение было не ясно, пожалуйста, спросите больше на комментарии)
Я бы попытался переместить 'private Subject s = PublishSubject.create();' в другой области, если бы я был в вас –
Blackbelt
@ Blackbelt, как я уже сказал. Это минимальный воспроизводимый код. В полном коде 'Function' есть свой класс. – Budius
Я могу судить о том, что вижу, и не могу догадаться, что вы написали. – Blackbelt