Ну, пример вы дали может быть упрощена:
Observable.range(0, Integer.MAX_VALUE)
, но я предполагаю, что вы действительно хотите сделать что-то более сложное. scan
не идентичен тому, что вы ищете, но он мог бы делать подобные вещи, и мы могли бы использовать его для создания Transformer
, который можно было бы использовать аналогично расширению.
Основное отличие от scan
состоит в том, что для каждого шага требуется новое входное значение, но, как и расширение, оно сохраняет прежнее значение. Мы можем получить это, просто проигнорировав новое значение ввода. Поскольку сканирование аналогично расширению, я собираюсь начать с примера scan
, в котором есть некоторые довольно большие недостатки, а затем изучить лучший вариант.
public class Expand<T, T> implements Transformer<T, T> {
private final Func1<T, T> expandFunc;
public Expand(final Func1<T, T> expandFunc) {
this.initialValue = initialValue;
this.expandFunc = expandFunc;
}
@Override
public Observable<T> call(Observable<T> source) {
// Here we treat emissions from the source as a new 'initial value'.
// NOTE: This will effectively only allow one input from the source, since the
// output Observable expands infinitely. If you want it to switch to a new expanded
// observable each time the source emits, use switchMap instead of concatMap.
return source.concatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T initialValue) {
// Make an infinite null Observable, just for our next-step signal.
return Observable.<Void>just(null).repeat()
.scan(initialValue, new Func2<T, Void, T>() {
@Override
public T call(final T currentValue, final Void unusedSignal) {
return expandFunc.call(currentValue);
}
});
}
});
}
}
Чтобы использовать этот трансформатор, давайте сделаем метод, который принимает текущее число, добавляет 1 и квадратизирует его.
Observable.just(1).compose(new Expand(new Func1<Integer, Integer>() {
@Override
public Integer call(final Integer input) {
final Integer output = input + 1;
return output * output;
}
});
В любом случае, возможно, вы заметили некоторые из главных неудобных точек этого подхода. Во-первых, есть проблема с коммутатором и concatMap, и как это по существу превращает один элемент из вывода Observable в бесконечную цепочку. Во-вторых, весь сигнал «Пустоты» Observable не должен быть необходим. Конечно, мы могли бы использовать range
или just(1).repeat()
или множество других вещей, но тем не менее они в конечном итоге будут выброшены.
Вот как вы могли бы моделировать его более чисто и рекурсивно.
public static <T> Observable<T> expandObservable(
final T initialValue, final Func1<T, T> expandFunc) {
return Observable.just(initialValue)
.concatWith(Observable.defer(new Func0<Observable<T>>() {
@Override
public Observable<T> call() {
return expandObservable(expandFunc.call(initialValue), expandFunc);
}
});
}
Таким образом, в этом примере, каждый рекурсивный проход выводит текущее значение (расширенное на каждом шаге, и concats к следующему шагу. defer
используется для поддержания бесконечной рекурсии происходило сразу же, так как он не делает . вызова кода для создания Observable, пока он не выписывает Используя это выглядит следующим образом:
expandObservable(1, new Func1<Integer, Integer>() {
@Override
public Integer call(final Integer input) {
final Integer output = input + 1;
return output * output;
}
}).subscribe(/** do whatever */);
Таким образом, так же, как compose
например, но гораздо опрятнее и чистого осуществления
Надежда, что помогает. ,
Извините, но карта не совпадает с расширением. Доза карты не имеет рекурсивной семантики, как расширяется.Я редактировал свой вопрос, чтобы сделать его более понятным. – xwk