2014-11-14 6 views
2

В RxJs существует метод под названием Observable.expand, который может рекурсивно расширять последовательность с помощью функции преобразования.Что такое эквивалент RxJava Observable.expand() в RxJs?

Например,

Rx.Observable.return(0).expand(function (x) { return Rx.Observable.return(x+1); }) 

будет излучать все целые числа

Однако я не могу найти этот метод в RxJava. Есть ли другие методы в RxJava для достижения аналогичной цели?

Для получения более подробной спецификации exapand(), см https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/expand.md

ответ

2

Ну, пример вы дали может быть упрощена:

Observable.range(0, Integer.MAX_VALUE) 

, но я предполагаю, что вы действительно хотите сделать что-то более сложное. scan не идентичен тому, что вы ищете, но он мог бы делать подобные вещи, и мы могли бы использовать его для создания Transformer, который можно было бы использовать аналогично расширению.

Основное отличие от scan состоит в том, что для каждого шага требуется новое входное значение, но, как и расширение, оно сохраняет прежнее значение. Мы можем получить это, просто проигнорировав новое значение ввода. Поскольку сканирование аналогично расширению, я собираюсь начать с примера scan, в котором есть некоторые довольно большие недостатки, а затем изучить лучший вариант.

public class Expand<T, T> implements Transformer<T, T> { 
    private final Func1<T, T> expandFunc; 

    public Expand(final Func1<T, T> expandFunc) { 
    this.initialValue = initialValue; 
    this.expandFunc = expandFunc; 
    } 

    @Override 
    public Observable<T> call(Observable<T> source) { 
    // Here we treat emissions from the source as a new 'initial value'. 

    // NOTE: This will effectively only allow one input from the source, since the 
    // output Observable expands infinitely. If you want it to switch to a new expanded 
    // observable each time the source emits, use switchMap instead of concatMap. 
    return source.concatMap(new Func1<T, Observable<T>>() { 

     @Override 
     public Observable<T> call(T initialValue) { 
     // Make an infinite null Observable, just for our next-step signal. 
     return Observable.<Void>just(null).repeat() 
      .scan(initialValue, new Func2<T, Void, T>() { 

       @Override 
       public T call(final T currentValue, final Void unusedSignal) { 
       return expandFunc.call(currentValue); 
       } 

      }); 
     } 

    }); 
    } 
} 

Чтобы использовать этот трансформатор, давайте сделаем метод, который принимает текущее число, добавляет 1 и квадратизирует его.

Observable.just(1).compose(new Expand(new Func1<Integer, Integer>() { 

    @Override 
    public Integer call(final Integer input) { 
    final Integer output = input + 1; 
    return output * output; 
    } 

}); 

В любом случае, возможно, вы заметили некоторые из главных неудобных точек этого подхода. Во-первых, есть проблема с коммутатором и concatMap, и как это по существу превращает один элемент из вывода Observable в бесконечную цепочку. Во-вторых, весь сигнал «Пустоты» Observable не должен быть необходим. Конечно, мы могли бы использовать range или just(1).repeat() или множество других вещей, но тем не менее они в конечном итоге будут выброшены.

Вот как вы могли бы моделировать его более чисто и рекурсивно.

public static <T> Observable<T> expandObservable(
    final T initialValue, final Func1<T, T> expandFunc) { 
    return Observable.just(initialValue) 
     .concatWith(Observable.defer(new Func0<Observable<T>>() { 

     @Override 
     public Observable<T> call() { 
      return expandObservable(expandFunc.call(initialValue), expandFunc); 
     } 

     }); 
} 

Таким образом, в этом примере, каждый рекурсивный проход выводит текущее значение (расширенное на каждом шаге, и concats к следующему шагу. defer используется для поддержания бесконечной рекурсии происходило сразу же, так как он не делает . вызова кода для создания Observable, пока он не выписывает Используя это выглядит следующим образом:

expandObservable(1, new Func1<Integer, Integer>() { 

    @Override 
    public Integer call(final Integer input) { 
    final Integer output = input + 1; 
    return output * output; 
    } 

}).subscribe(/** do whatever */); 

Таким образом, так же, как compose например, но гораздо опрятнее и чистого осуществления

Надежда, что помогает. ,

0

Я думаю, что вы ищете map:

https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#map

numbers = Observable.from([1, 2, 3, 4, 5]); 

numbers.map({it * it}).subscribe(
    { println(it); },       // onNext 
    { println("Error: " + it.getMessage()); }, // onError 
    { println("Sequence complete"); }   // onCompleted 
); 

1 
4 
9 
16 
25 
Sequence complete 
+2

Извините, но карта не совпадает с расширением. Доза карты не имеет рекурсивной семантики, как расширяется.Я редактировал свой вопрос, чтобы сделать его более понятным. – xwk