2014-12-11 4 views
0

Я использую rxjava для задач обслуживания следующим образом:Как испускать результаты с использованием таймера Observable с flatMap

В классе, где мне нужно регулярное техническое обслуживание, я использую следующую статическую подписку, что приводит в Observable запускается в первый раз, когда класс загружается в память, а затем с регулярными интервалами, как указано.

private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS) 
     .flatMap(new Func1<Long, Observable<String>>() { 
      @Override public Observable<String> call(Long aLong) { 

       // some code 

       return Observable.just(null); 
      } 
     }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()) 
     .subscribe(); 

Теперь у меня есть ситуация, когда я хочу сообщить о результатах моего обслуживания в пользовательский интерфейс.

Обычно я хотел бы использовать следующую схему

Observable.create(new Observable.OnSubscribe<String>() { 

     @Override public void call(Subscriber<? super String> subscriber) { 

      // some code 

      subscriber.onNext(result); 
      subscriber.onCompleted();   } 

    }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Action1<String>() { 

       @Override public void call(String result) { 

        // write to the UI 

       } 

      }); 

, но здесь мы имеем Наблюдаемые, которая выполняется только один раз.

Для наблюдения, которое выполняется через регулярные промежутки времени, я не смог найти способ вызова «Действие» в «Абоненте», чтобы передать результат с помощью функции «подписчик.onNext()». Похоже, что для Observable нет подходящей подписи, которая может длиться с таймера() и в то же время позволяет подписаться с действием. Но, зная rxjava, я уверен, что есть трюк ;-)

Я мог бы сделать эту работу, используя zip, чтобы застегнуть таймер Наблюдаемый и одноразовый Observable (в основном, скрепляющий две версии вместе), но я предпочел бы использовать потому что он ведет себя несколько иначе.

-

Я попытался объединить обе версии в один следующим образом:

private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS) 
     .flatMap(new Func1<Long, Observable<String>>() { 
      @Override public Observable<String> call(Long aLong) { 

       // some code // stays here to ensure there is no concurrency while executing 

       final String result = "result"; // I store the result in a final variable after some code has been finished 

       return Observable.create(new Observable.OnSubscribe<String>() { 
        @Override public void call(Subscriber<? super String> subscriber) { 
         subscriber.onNext(result); // then I use it in a new Observable and emit it 
         subscriber.onCompleted(); // not sure if this is needed here (haven't tested this yet) 
        } 
       }); 

      } 
     }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()) 
     .subscribe(new Action1<String>() { 
      @Override public void call(String result) { 
       // so I can finally consume the result on the UI thread 
      } 
     }); 

Вместо создания и излучающих «нулевой» Observable, я создаю тот, который позволяет мне послать результат для абонента.

Довольно грязно, но это должно работать, не так ли? Любое более простое решение? Что ты думаешь?

+0

Почему 'Observable.timer.flatMap' не работает? – zsxwing

+0

Я считаю, что это не сработает, потому что flatMap сглаживает свои испускаемые Observables в единый Observable (что я хочу), но с этим я не нахожу подпись, которая дает мне интерфейс с подписчиком [например, @Override public void call (Абонент подписчик)], поэтому я могу использовать subscriber.onNext(). –

ответ

1

Я не уверен, что понимаю вашу проблему.

Вы хотите использовать ту же подписку с вашего наблюдаемого таймера и вашего наблюдаемого?

Возможно, вы можете использовать Субъект, чтобы действовать как мост между вашими Наблюдателями.

Subject<?, ?> bridge = PublishSubject.create(); 
Observable.timer(5, SECONDS).flatMap(/* whatever*/).mergeWith(bridge).subscribe(/* toDo */); 
Observable.create(/* subscription */).subscribe(bridge); 

Каждый элемент излучает ваш «Один раз наблюдаемого» будет выталкиваться в Brige, что будет толкать его в «таймер» наблюдаемой.

Это вы что искали?

+0

[Я никогда не использовал тему/мост, поэтому мне нужно прочитать и попробовать это, прежде чем я смогу сказать, если это сработает. Я вернусь к этому через некоторое время.] –

+0

Я добавил версию выше, где я объединил оба фрагмента кода. Без тестирования я считаю, что это должно работать. С PublishSubject я думаю, что мы тоже на хорошем пути, и, возможно, можем решить проблему намного проще, чем то, что я сделал выше.Но из вашего объяснения «Каждый элемент, излучаемый вашим« Одноразовым наблюдением », будет вдавлен в бридж, который подталкивает его к наблюдаемому« таймеру ». похоже, вы неправильно поняли, так как я не хочу объединять два Observables. Я только намерен использовать Timer Observable в качестве часов, и мне не нужна его ценность. –

+0

ОК. Возможно, вы хотите закрепить свой таймер другим наблюдаемым. Во всяком случае, я думаю, вы поняли этот трюк. ;) – dwursteisen

 Смежные вопросы

  • Нет связанных вопросов^_^