Я в процессе миграции AsyncTaskLoader
в RxJava, пытаясь понять все подробности о RxJava подход к параллелизму. Простые вещи работает нормально, но я борюсь со следующим кодом:Код работает на основном потоке, даже с subscribeOn указан
Это метод верхнего уровня, который запускается на выполнение:
mCompositeDisposable.add(mDataRepository
.getStuff()
.subscribeOn(mSchedulerProvider.io())
.subscribeWith(...)
mDataRepository.getStuff() выглядит следующим образом:
public Observable<StuffResult> getStuff() {
return mDataManager
.listStuff()
.flatMap(stuff -> Observable.just(new StuffResult(stuff)))
.onErrorReturn(throwable -> new StuffResult(null));
И последний слой:
public Observable<Stuff> listStuff() {
Log.d(TAG, ".listStuff() - "+Thread.currentThread().getName());
String sql = <...>;
return mBriteDatabase.createQuery(Stuff.TABLE_NAME, sql).mapToList(mStuffMapper);
}
Так что с выше коде log
распечатает .listStuff() - main
, что не совсем то, что я ищу. И я не совсем уверен, почему. У меня сложилось впечатление, что, установив subscribeOn
, каждое событие, извлеченное из цепочки, будет обработано в потоке, указанном в методе subscribeOn.
То, что я думаю, происходит из-за того, что код источника-aka-final-layer, до достижения mBriteDatabase
, не из мира RxJava и поэтому не является событием, пока не вызывается createQuery
. Значит, мне, наверное, нужна какая-то обертка? Я пытался применять .fromCallable
, однако это обертка для не коды Rx, и мой уровень базы данных возвращает наблюдаемый ...
Спасибо! Я запутался и перепробовал что-то очень простое. – vkislicins