2016-11-20 2 views
0

У меня есть Observable<Item>(А), который излучает его элементы каждый раз, когда PublishSubject<Item>(В) имеет новый Item.RxJava2: применить заданный планировщик для всего потока?

затем используется, например, как A.subscribeOn(computationScheduler).observeOn(mainThread)


Цель: Иметь полный поток от делать свою работу на вычислительном-планировщиком, а затем потреблять результат на основной-нить.

Фактический: В зависимости от того, где находится B, весь поток будет работать на разных планировщиках соответственно. В нижеследующем примере - по основному потоку, даже через вызов .subscribeOn().

Как я могу «заставить» полный поток из А выполнять свою работу над данным планировщиком и отправлять результаты по другому? A.compose() не делает трюк.


Фактический код:

class SomeClass 

private final PublishSubject<ExerciseQueryOptions> queryOptionsPublishSubject = PublishSubject.create(); 

@NonNull 
@Override 
public Observable<List<ExerciseViewModel>> call() { 
    return queryOptionsPublishSubject 
      .startWith(createQueryOptions()) 
      .distinctUntilChanged() 
      .flatMap(new Function<ExerciseQueryOptions, ObservableSource<List<ExerciseViewModel>>>() { 
       @Override 
       public ObservableSource<List<ExerciseViewModel>> apply(ExerciseQueryOptions queryOptions) throws Exception { 
        //This is the code I want to run on a given scheduler, 
        //Supplied by the code that calls this .call() method. 
        return datastore.queryAll(ExerciseModel.class, true) 
          .map(transformer);      
       } 
      }); 
} 

//Other class 
SomeClass A; 

A.subscribeOn(Schedulers.computation()).observeOn(AndroidScheduers.mainThread()) 
.subcribe(...); 

ответ

0

Каждый элемент вашего flatMap должен быть планировщиком:

   return datastore.queryAll(ExerciseModel.class, true) 
         .subscribeOn(Schedulers.computation()) 
         .map(transformer);      

Вы также должны иметь весь поток подписаться на вычисления планировщиком, в противном случае начальная подписка будет выполняться в текущем потоке.