0

Скажем, у меня есть метод, подобный приведенному ниже коду, в котором List flatMapped для отдельных строк, каждый из которых имеет некоторую дорогостоящую операцию, применяемую к ним. Есть ли способ сравнить дорогостоящие операции, так же, как я использовал бы parallelStream() в Java 8?Можно ли параллельно выполнять операцию, управляемую плоской матрицей?

final List<String> names = new ArrayList<String>() {{ 
     add("Ringo"); 
     add("John"); 
     add("Paul"); 
     add("George"); 
    }}; 

    Observable.just(names).subscribeOn(Schedulers.io()) 
      .flatMap(new Func1<List<String>, Observable<String>>() { 
       @Override 
       public Observable<String> call(final List<String> names) { 
        return Observable.create(new Observable.OnSubscribe<String>() { 
         @Override 
         public void call(Subscriber<? super String> subscriber) { 
          for (String name : names) { 
           subscriber.onNext(name); 
          } 
         } 
        }); 
       } 
      }) 
      .map(new Func1<String, String>() { 
       @Override 
       public String call(String s) { 
        //Simulate expensive operation 
        try { 
         Thread.sleep(6000); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
        } 
        return s.toUpperCase(); 
       } 
      }).subscribe(new Subscriber<String>() { 
     @Override 
     public void onCompleted() { 

     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onNext(String s) { 
      Log.v("RXExample", s + " on " + Thread.currentThread().getName()); 
     } 
    }); 

Для завершения использования применяемое изменение в ответе выглядит следующим образом и прекрасно работает!

final List<String> names = new ArrayList<String>() {{ 
     add("Ringo"); 
     add("John"); 
     add("Paul"); 
     add("George"); 
    }}; 

    Observable.just(names).subscribeOn(Schedulers.io()) 
      .flatMap(new Func1<List<String>, Observable<String>>() { 
       @Override 
       public Observable<String> call(final List<String> names) { 
        return Observable.create(new Observable.OnSubscribe<String>() { 
         @Override 
         public void call(final Subscriber<? super String> subscriber) { 
          for (final String name : names) { 
           Observable 
             .just(name) 
             .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(5))) 
             .map(new Func1<String, String>() { 
              @Override 
              public String call(String s) { 
               //Simulate expensive operation 
               try { 
                Thread.sleep(6000); 
               } catch (InterruptedException e) { 
                e.printStackTrace(); 
               } 
               return s.toUpperCase(); 
              } 
             }).subscribe(new Observer<String>() { 
            @Override 
            public void onCompleted() { 

            } 

            @Override 
            public void onError(Throwable e) { 

            } 

            @Override 
            public void onNext(String s) { 
             subscriber.onNext(name); 
            } 
           }); 
          } 
         } 
        }); 
       } 
      }) 
      .subscribe(new Subscriber<String>() { 
     @Override 
     public void onCompleted() { 

     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onNext(String s) { 
      Log.v("RXExample", s + " on " + Thread.currentThread().getName()); 
     } 
    }); 

ответ

0

Вы можете параллельно работать с плоской картой, как показано в следующем примере. Я использую RxJava2 для тестирования.

Для более подробного объяснения, пожалуйста, прочитайте использование flatMap здесь: http://tomstechnicalblog.blogspot.de/2015/11/rxjava-achieving-parallelization.html

@Test 
public void name() throws Exception { 
    final List<String> names = new ArrayList<String>() {{ 
     add("Ringo"); 
     add("John"); 
     add("Paul"); 
     add("George"); 
    }}; 

    Observable<String> stringObservable = Observable.fromIterable(names) 
      .flatMap(s -> { 
       return longWork(s).doOnNext(s1 -> { 
        printCurrentThread(s1); 
       }).subscribeOn(Schedulers.newThread()); 
      }); 

    TestObserver<String> test = stringObservable.test(); 

    test.awaitDone(2_000, TimeUnit.MILLISECONDS).assertValueCount(4); 
} 

private Observable<String> longWork(String s) throws InterruptedException { 
    return Observable.fromCallable(() -> { 
     Thread.sleep(1_000); 

     return s; 
    }); 
} 

private void printCurrentThread(String additional) { 
    System.out.println(additional + "_" + Thread.currentThread()); 
} 
+0

Классный, спасибо! Я применил его к своему фрагменту кода, чтобы показать, как он будет выглядеть в AndroidRX. –