Скажем, у меня есть метод, подобный приведенному ниже коду, в котором List flatMapped для отдельных строк, каждый из которых имеет некоторую дорогостоящую операцию, применяемую к ним. Есть ли способ сравнить дорогостоящие операции, так же, как я использовал бы parallelStream() в Java 8?Можно ли параллельно выполнять операцию, управляемую плоской матрицей?
final List<String> names = new ArrayList<String>() {{
add("Ringo");
add("John");
add("Paul");
add("George");
}};
Observable.just(names).subscribeOn(Schedulers.io())
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(final List<String> names) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (String name : names) {
subscriber.onNext(name);
}
}
});
}
})
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//Simulate expensive operation
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.v("RXExample", s + " on " + Thread.currentThread().getName());
}
});
Для завершения использования применяемое изменение в ответе выглядит следующим образом и прекрасно работает!
final List<String> names = new ArrayList<String>() {{
add("Ringo");
add("John");
add("Paul");
add("George");
}};
Observable.just(names).subscribeOn(Schedulers.io())
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(final List<String> names) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> subscriber) {
for (final String name : names) {
Observable
.just(name)
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(5)))
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//Simulate expensive operation
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
subscriber.onNext(name);
}
});
}
}
});
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.v("RXExample", s + " on " + Thread.currentThread().getName());
}
});
Классный, спасибо! Я применил его к своему фрагменту кода, чтобы показать, как он будет выглядеть в AndroidRX. –