2016-09-02 3 views
2

У меня проблема при использовании оператора RxJava concat. У меня есть два наблюдаемых, первые Издают результаты из базы данных сервера, а другие выдают результаты из локальной базы данных, а затем я Concat:Realm + Retrofit + RxJava: Concat и SubscribeOn

// Uses a Realm in the UI thread 
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId); 

// Uses Retrofit 
Observable<MyResult> localObservable = mLocalDataSource.find(tId); 

Observable.concat(localObservable, remoteObservable) 
    .doOnNext(result -> /* Do my stuff */) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .doOnError(throwable -> throwable.printStackTrace()) 
    .subscribe() 

Таким образом, это вызывает у меня проблемы, так как я не использую subscribeOn() в конкатенированный наблюдаемый работает на AndroidScheduler.MainThread(), и это не запускает пульт, и он запускает NetworkOnMainThreadException.

Если я реализую subscribeOn(Schedulers.computation()), я получаю Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created, поскольку, конечно, Observable не работает в потоке, экземпляр Realm существует.

Я искал в других вопросах, и я не получил ничего полезного, я проверил пример, сделанный по царству: https://github.com/realm/realm-java/blob/master/examples/rxJavaExample/src/main/java/io/realm/examples/rxjava/retrofit/RetrofitExample.java, но, как ни странно, я вижу, что модифицированный наблюдаемый подписывается ни о чем, и он работает.

Почему это работает на образце, и в моем коде я не могу сделать то же самое? Любое предложение?

+0

... это действительно ваш «локальный» наблюдаемый, который использует Retrofit? – EpicPandaForce

ответ

2

Я считаю, что вы должны использовать subscribeOn() в нужном месте.

// Uses a Realm in the UI thread 
Observable<MyResult> realmObservable = mRealmDataSource.find(tId).subscribeOn(AndroidSchedulers.mainThread()); 

// Uses Retrofit 
Observable<MyResult> retrofitObservable = mRetrofitDataSource.find(tId).subscribeOn(Subscribers.io()); 

Observable.concat(realmObservable, retrofitObservable) 
    .doOnNext(result -> /* Do my stuff */) 
    .subscribeOn(AndroidSchedulers.mainThread()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .doOnError(throwable -> throwable.printStackTrace()) 
    .subscribe() 

Посмотрите, исправит ли он вашу проблему.

+0

Спасибо за это, я тоже предположил это и попробовал, но поведение моего приложения стало странным, и только первые наблюдаемые в concat работали. Я уверен, что делаю что-то еще, но я уже изменил свою первоначальную реализацию. Благодарю вас. –

2

Вы можете Concat локальные и удаленные наблюдаемыми, как показано ниже:

// Uses a Realm in the UI thread 
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId); 

// Uses Retrofit 
Observable<MyResult> localObservable = mLocalDataSource.find(tId); 

Observable.concat(localObservable, remoteObservable).first() 
       .map(new Func1<MyResult, MyResult>() { 
        @Override 
        public myResult call(MyResult result) { 
         if (result == null) { 
          throw new IllegalArgumentException(); 
         } 
         return result; 
        } 
       }); 

И подписываться, как показано ниже:

CompositeSubscription mCompositeSubscription = new CompositeSubscription(); 
final Subscription subscription = mRepo.find(tId 
       .subscribeOn(Schedulers.io()) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .subscribe(new Observer<MyResult>() { 
        @Override 
        public void onCompleted() { 
         // Completed 
        } 

        @Override 
        public void onError(Throwable e) { 
         // onError 
        } 

        @Override 
        public void onNext(MyResult result) { 
         //onSuccess 
        } 
       }); 
mCompositeSubscription.add(subscription); 

Вы можете проверить этот репозиторий для RxJava + Модернизированный + Realm https://github.com/savepopulation/wikilight

Удачи!