2017-02-16 9 views
0

Я в процессе миграции AsyncTaskLoader в RxJava, пытаясь понять все подробности о RxJava подход к параллелизму. Простые вещи работает нормально, но я борюсь со следующим кодом:Код работает на основном потоке, даже с subscribeOn указан

Это метод верхнего уровня, который запускается на выполнение:

mCompositeDisposable.add(mDataRepository 
      .getStuff() 
      .subscribeOn(mSchedulerProvider.io()) 
      .subscribeWith(...) 

mDataRepository.getStuff() выглядит следующим образом:

public Observable<StuffResult> getStuff() { 
    return mDataManager 
      .listStuff() 
      .flatMap(stuff -> Observable.just(new StuffResult(stuff))) 
      .onErrorReturn(throwable -> new StuffResult(null)); 

И последний слой:

public Observable<Stuff> listStuff() { 
     Log.d(TAG, ".listStuff() - "+Thread.currentThread().getName()); 
     String sql = <...>; 
     return mBriteDatabase.createQuery(Stuff.TABLE_NAME, sql).mapToList(mStuffMapper); 
} 

Так что с выше коде log распечатает .listStuff() - main, что не совсем то, что я ищу. И я не совсем уверен, почему. У меня сложилось впечатление, что, установив subscribeOn, каждое событие, извлеченное из цепочки, будет обработано в потоке, указанном в методе subscribeOn.

То, что я думаю, происходит из-за того, что код источника-aka-final-layer, до достижения mBriteDatabase, не из мира RxJava и поэтому не является событием, пока не вызывается createQuery. Значит, мне, наверное, нужна какая-то обертка? Я пытался применять .fromCallable, однако это обертка для не коды Rx, и мой уровень базы данных возвращает наблюдаемый ...

ответ

3

Вашего Log.d вызов происходит

  • сразу, когда listStuff вызывается
  • который сразу после получения getStuff
  • , который является первым, что происходит в фрагменте кода верхнего уровня, который вы нам показываете.

Если вам нужно сделать это, когда подписка произойдет, вы должны быть четко:

public Observable<Stuff> listStuff() { 
    String sql = <...>; 
    return mBriteDatabase.createQuery(Stuff.TABLE_NAME, sql) 
     .mapToList(mStuffMapper) 
     .doOnsubscribe(() -> Log.d(TAG, ".listStuff() - "+Thread.currentThread().getName())); 
} 
+0

Спасибо! Я запутался и перепробовал что-то очень простое. – vkislicins