2016-12-28 8 views
1

Я пытаюсь отказаться от подписки после получения первого элемента из наблюдаемого. И, похоже, это не работает. Что я делаю не так?RxJava: OnNext Отказаться от подписки не работает

public class ObservableAndSubscriber { 

    public static void main(final String[] args) { 
     final Observable<String> strObservable = Observable.create(s -> { 
      while (true) { 
       s.onNext("Hello World!!"); 
      } 
     }); 

     final Subscriber<String> strSubscriber = new Subscriber<String>() { 

      @Override 
      public void onCompleted() { 

      } 

      @Override 
      public void onError(final Throwable e) { 
       e.printStackTrace(); 

      } 

      @Override 
      public void onNext(final String t) { 
       System.out.println(t); 
       this.unsubscribe(); 

      } 
     }; 
     strObservable.subscribe(strSubscriber); 
    } 
} 

Результат, кажется, печатает «Hello World» в бесконечном цикле.

+0

http://stackoverflow.com/q/30046124/697313 - связанное обсуждение. –

ответ

0

Я сделал этот пример, когда читал книгу Томаша Нуркевича. Позже он получил аналогичный пример и объяснил, что не так с моим кодом.

Бесконечные петли являются злыми! Так как у меня было одно в моем наблюдаемом выражении лямбда-выражения, оно выполняется в контексте моего основного потока и подписывается на неограниченное количество блоков. Чтобы заставить наблюдаемый создать lambda выполнить в другом потоке, что означает, что основной поток никогда не блокируется и подписывается() завершается. Я попросил, чтобы подписка произошла в потоке ввода-вывода, и это сделало трюк, а это означает, что бесконечный цикл происходит в потоке ввода-вывода и не блокирует основной поток.

strObservable.subscribeOn(Schedulers.io()).subscribe(strSubscriber); 
+0

Я не думаю, что это решит проблему отказа от подписки. Добавьте 'Thread.sleep (1000)' в конце 'main()' для задержки завершения программы и посмотрите, как это происходит. –

1

Чтобы отказаться от подписки на работу, вам необходимо проверить статус подписки в вашем цикле. В противном случае он будет бесконечно истощать ваш процессор.

Самый простой способ справиться с бесконечными секвенциями - использовать предоставленные библиотекой методы Observable.generate() или Observable.fromIterable(). Они делают правильную проверку для вас.

Также убедитесь, что вы подписаны и соблюдаете различные темы. В противном случае вы будете обслуживать только одного абонента.

Ваш пример:

strObservable = Observable.generate(g -> g.onNext("Hello!")); 
strObservable.subscribeOn(Schedulers.newThread()).subscribe(strSubscriber);