3

У меня есть следующие Горячие Observable:RxJava - Как остановить (и возобновить) Hot Observable (интервал)?

hotObservable = Observable.interval(0L, 1L, TimeUnit.SECONDS) 
          .map((t) -> getCurrentTimeInMillis())) 

Однако, я не могу найти хороший способ, чтобы остановить его. Я был в состоянии частично решить эту проблему с помощью takeWhile и boolean флага (runTimer):

Observable.interval(0L, 1L, TimeUnit.SECONDS) 
      .takeWhile((t) -> runTimer) 
      .map((t) -> getCurrentTimeInMillis())) 

Есть 2 вещи, которые мне не нравится в этом подходе, хотя:

  1. я должен держать флаг runTimer вокруг, чего я не хочу.
  2. После runTimer становится false, Наблюдаемый просто завершает, что означает, что если я хочу снова испустить, мне нужно создать новый Наблюдаемый. Я не хочу этого. Я просто хочу, чтобы Observable перестала испускать предметы, пока я не скажу, чтобы она начала снова.

Я надеялся на что-то вроде этого:

hotObservable.stop(); 
hotObservable.resume(); 

Таким образом, мне не нужно держать флаги вокруг и наблюдаемое всегда жив (он не может быть излучающими событий, хотя).

Как я могу это достичь?

+0

В случае, если наблюдаемая остановка * * продуцирующие элементы, или продолжить работу, но остановить * излучающие * их? – npace

+0

@npace Остановка производства была бы идеальной, но я был бы счастлив прекратить излучать только. Оба работают в моей ситуации. – Tiago

+1

Ответ Kiskae совершенно правдоподобен, но просто FYI .. если вы замените оператор 'takeWhile' в своем предыдущем решении с помощью оператора' filter', вы избавитесь от второго недостатка ... – koperko

ответ

6

Одним из возможных подходов использует BehaviorSubject и switchMap:

BehaviorSubject<Boolean> subject = BehaviorSubject.create(true); 
hotObservable = subject.distinctUntilChanged().switchMap((on) -> { 
    if (on) { 
     return Observable.interval(0L, 1L, TimeUnit.SECONDS); 
    } else { 
     return Observable.never(); 
    } 
}).map((t) -> getCurrentTimeInMillis()); 

Послав булевы субъекту выход наблюдаемой можно управлять. subject.onNext(true) приведет к появлению любых наблюдаемых, созданных с использованием этого объекта, для начала испускания значений. subject.onNext(false) отключает этот поток.

switchMap заботится об устранении лежащих в основе наблюдаемых при его выключении. Он также использует distinctUntilChanged, чтобы убедиться, что он не делает ненужного переключения.

0

Mybe вы можете использовать этот

Observable.interval(3, TimeUnit.SECONDS) 
     .takeUntil(stop) 
     .subscribe(new MyObserver()); 

Thread.sleep(10000); 
stop.onNext(-1); 

 Смежные вопросы

  • Нет связанных вопросов^_^