У меня есть Observable<Item>
(А), который излучает его элементы каждый раз, когда PublishSubject<Item>
(В) имеет новый Item
.RxJava2: применить заданный планировщик для всего потока?
затем используется, например, как A.subscribeOn(computationScheduler).observeOn(mainThread)
Цель: Иметь полный поток от делать свою работу на вычислительном-планировщиком, а затем потреблять результат на основной-нить.
Фактический: В зависимости от того, где находится B, весь поток будет работать на разных планировщиках соответственно. В нижеследующем примере - по основному потоку, даже через вызов .subscribeOn()
.
Как я могу «заставить» полный поток из А выполнять свою работу над данным планировщиком и отправлять результаты по другому? A.compose() не делает трюк.
Фактический код:
class SomeClass
private final PublishSubject<ExerciseQueryOptions> queryOptionsPublishSubject = PublishSubject.create();
@NonNull
@Override
public Observable<List<ExerciseViewModel>> call() {
return queryOptionsPublishSubject
.startWith(createQueryOptions())
.distinctUntilChanged()
.flatMap(new Function<ExerciseQueryOptions, ObservableSource<List<ExerciseViewModel>>>() {
@Override
public ObservableSource<List<ExerciseViewModel>> apply(ExerciseQueryOptions queryOptions) throws Exception {
//This is the code I want to run on a given scheduler,
//Supplied by the code that calls this .call() method.
return datastore.queryAll(ExerciseModel.class, true)
.map(transformer);
}
});
}
//Other class
SomeClass A;
A.subscribeOn(Schedulers.computation()).observeOn(AndroidScheduers.mainThread())
.subcribe(...);