2016-03-21 7 views
3

В соответствии с this thread conCatMap и карта только отличаются по порядку, в котором исчисляются элементы. Поэтому я сделал тест и создал простой поток целых чисел и хотел посмотреть, в каком порядке они будут испускаться. Я сделал небольшое наблюдение, которое будет принимать цифры в диапазоне от 1 до 5 и несколько их на два. Легко.RxJava - flatmap vs concatMap - почему вы заказываете то же самое по подписке?

Вот код с flatmap:

myObservable.flatMap(new Func1<Integer, Observable<Integer>>() { 
     @Override 
     public Observable<Integer> call(Integer integer) { 
      return Observable.just(integer * 2); 
     } 
    }).subscribe(new Observer<Integer>() { 
     @Override 
     public void onCompleted() { 

     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onNext(Integer integer) { 
     Log.v("myapp","from flatMap:"+integer); 
     } 
    }); 

и тот же самый код, используя concatMap:

myObservable.concatMap(new Func1<Integer, Observable<Integer>>() { 
     @Override 
     public Observable<Integer> call(Integer integer) { 
      return Observable.just(integer * 2); 
     } 
    }).subscribe(new Observer<Integer>() { 
     @Override 
     public void onCompleted() { 

     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onNext(Integer integer) { 
     Log.v("myapp","from concatmap:"+integer); 
     } 
    }); 

, когда я увидел печать в журналах упорядоченность была одинаковой для обоих, почему ? Я думал, что только concatMap сохранит порядок?

ответ

9

То, что вы видите, является совпадением. Каждый раз, когда ваш flatMap возвращает значение, он делает это в том же потоке, что и предыдущий.

Я изменил свой пример, чтобы воспользоваться преимуществами многопоточностей:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 
     .flatMap(integer -> Observable.just(integer) 
       .observeOn(Schedulers.computation()) 
       .flatMap(i -> { 
        try { 
         Thread.sleep(new Random().nextInt(1000)); 
         return Observable.just(2 * i); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
         return Observable.error(e); 
        } 
       })) 
     .subscribe(System.out::println, 
       Throwable::printStackTrace, 
       () -> System.out.println("onCompleted")); 

Я задерживая каждый 2 * i значения случайной задержкой, чтобы заставить другой порядок. Кроме того, до этого я добавил observeOn(Schedulers.computation()), поэтому следующий оператор (flatMap) работает на пуле вычислений - это делает многопоточную магию.

Это выход я получаю на моем примере (на Android):

I/System.out: 6 
I/System.out: 4 
I/System.out: 12 
I/System.out: 14 
I/System.out: 8 
I/System.out: 2 
I/System.out: 16 
I/System.out: 20 
I/System.out: 10 
I/System.out: 18 
I/System.out: onCompleted 

Если я заменяю flatMap после just с concatMap, то я получаю правильно упорядоченный выход.

Существует great post by Thomas Nield с надлежащим объяснением.

+0

Не могли бы вы исправить ссылку? Должно ли это быть https://tomstechnicalblog.blogspot.com/2015/11/rxjava-achieving-parallelization.html? –

+0

@ XiaoPeng сделано, спасибо –