2016-09-09 2 views
0

Я пытаюсь создать Observable так, чтобы он загружал некоторые данные из сети с интервалом и всякий раз, когда пользователь обновляет страницу. Это суть того, что я до сих пор:RxAndroid Наблюдаемый запуск на неожиданной теме

PublishSubject<Long> refreshSubject = PublishSubject.create(); 
Observable<MyDataType> observable = Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS), 
    refreshSubject 
) 
.flatMap(t -> { 
    // network operations that eventually return a value 
    // these operations are not observables themselves 
    // they are fully blocking network operations 
}) 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(data -> { 
    // update ui with data 
}, error -> { 
    // do something with error 
}); 

Позже в обновления обратного вызова у меня есть:

refreshSubject.onNext(0L); 

Он работает на интервале штрафа, однако, когда я обновить, она взрывается с NetworkOnMainThreadException , Я думал, что обработал это с помощью subscribeOn/observeOn. Что мне не хватает? Кроме того, почему это не приводит к сбою, когда Observer запускается из интервала?

ответ

4

Вы должны изменить subscribeOn(Schedulers.io()) на observeOn(Schedulers.io()) и переместить его по карте. Причина этого заключается в том, что ваш refreshSubject является PublishSubject, который является наблюдаемым и наблюдателем.

Так как onNext() этого PublishSubject вызывается внутри интерна. Наблюдается первым, прежде чем результат будет доставлен в вашу подписку. Это также причина того, что он работает, когда вы просто используете свой наблюдаемый (и тот факт, что interval всегда подписывается на поток вычислений по умолчанию).

Просто проверьте вывод этих двух фрагментах:

Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS), 
    refreshSubject 
) 
.observeOn(Schedulers.io()) 
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString())) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(data -> { 
    Log.d("Subscribe Thread", Thread.currentThread().toString()); 
}, error -> { 
       // do something with error 
      }); 

против

Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS), 
    refreshSubject 
) 
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString())) 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(data -> { 
    Log.d("Subscribe Thread", Thread.currentThread().toString()); 
}, error -> { 
    // do something with error 
}); 
+0

Ваше предложение работал, но я немного смущен этим утверждением: «С onNext() этого PublishSubject вызывается внутри интерьера. Наблюдается первым, прежде чем результат будет доставлен в вашу подписку ». Что такое "intern Observable"? – mrobinson7627