2016-02-09 3 views
3

TL; DR: Как вы создаете Observable, который создает только подписку, если она холодно, и ставит в очередь любые другие вызовы подписки, когда это hot?RxJava: Как подписаться, только если Observable Cold?

Я хотел бы создать a Observable, который может выполнять только одну подписку за раз. Если какие-либо другие подписчики подписываются на Observable, я бы хотел, чтобы они были поставлены в очередь для запуска, когда Observable завершен (после onComplete).

Я могу построить эту конструкцию самостоятельно, имея какой-то стек и выскакивая стек на каждом onComplete, но похоже, что эта функциональность уже существует в RxJava.

Есть ли способ ограничить подписку таким образом?

(More on hot and cold observables)

+0

Это не то, что означает «горячий наблюдаемый». Наблюдается наблюдаемое значение «горячего наблюдения», которое влияет на побочные эффекты между подписками. Например, когда он публикует данные в прямом эфире. То, что вы действительно хотите, это источник, который передает наблюдаемые данные, которые дают только один раз, когда все предыдущие наблюдаемые делаются. В: Нужно ли блокировать вызов подписки или это достаточно хорошо, если второй Observable не дает 'onNext', пока предыдущий не назвал' onCompleted'? – Dorus

+0

Также я не знаю, как построить оператор, который сделает это за вас, но я уверен, что это можно сделать с помощью существующих операторов. – Dorus

+0

Еще одна неясная часть: вы выполняете внутренний холодный наблюдаемый, который подписывается подпиской в ​​свою очередь, или существует ли продолжительная горячая наблюдаемая ситуация, когда следующая подписка имеет право только на сообщения, когда последняя отменяет подписку? – Dorus

ответ

2

Там нет встроенных операторов или сочетание операторов я могу думать о том, что можно достичь. Вот как я бы реализовать:

import java.util.Queue; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.atomic.*; 

import rx.*; 
import rx.observers.TestSubscriber; 
import rx.subjects.PublishSubject; 
import rx.subscriptions.Subscriptions; 

public final class SequenceSubscribers<T> implements Observable.OnSubscribe<T> { 

    final Observable<? extends T> source; 

    final Queue<Subscriber<? super T>> queue; 

    final AtomicInteger wip; 

    volatile boolean active; 

    public SequenceSubscribers(Observable<? extends T> source) { 
     this.source = source; 
     this.queue = new ConcurrentLinkedQueue<>(); 
     this.wip = new AtomicInteger(); 
    } 

    @Override 
    public void call(Subscriber<? super T> t) { 
     SubscriberWrapper wrapper = new SubscriberWrapper(t); 
     queue.add(wrapper); 

     t.add(wrapper); 
     t.add(Subscriptions.create(() -> wrapper.next())); 

     drain(); 
    } 

    void complete(SubscriberWrapper inner) { 
     active = false; 
     drain(); 
    } 

    void drain() { 
     if (wip.getAndIncrement() != 0) { 
      return; 
     } 
     do { 
      if (!active) { 
       Subscriber<? super T> s = queue.poll(); 
       if (s != null && !s.isUnsubscribed()) { 
        active = true; 
        source.subscribe(s); 
       } 
      } 
     } while (wip.decrementAndGet() != 0); 
    } 

    final class SubscriberWrapper extends Subscriber<T> { 
     final Subscriber<? super T> actual; 

     final AtomicBoolean once; 

     public SubscriberWrapper(Subscriber<? super T> actual) { 
      this.actual = actual; 
      this.once = new AtomicBoolean(); 
     } 

     @Override 
     public void onNext(T t) { 
      actual.onNext(t); 
     } 

     @Override 
     public void onError(Throwable e) { 
      actual.onError(e); 
      next(); 
     } 

     @Override 
     public void onCompleted() { 
      actual.onCompleted(); 
      next(); 
     } 

     @Override 
     public void setProducer(Producer p) { 
      actual.setProducer(p); 
     } 

     void next() { 
      if (once.compareAndSet(false, true)) { 
       complete(this); 
      } 
     } 
    } 

    public static void main(String[] args) { 
     PublishSubject<Integer> ps = PublishSubject.create(); 

     TestSubscriber<Integer> ts1 = TestSubscriber.create(); 
     TestSubscriber<Integer> ts2 = TestSubscriber.create(); 

     Observable<Integer> source = Observable.create(new SequenceSubscribers<>(ps)); 

     source.subscribe(ts1); 
     source.subscribe(ts2); 

     ps.onNext(1); 
     ps.onNext(2); 

     ts1.assertValues(1, 2); 
     ts2.assertNoValues(); 

     ts1.unsubscribe(); 

     ps.onNext(3); 
     ps.onNext(4); 
     ps.onCompleted(); 

     ts1.assertValues(1, 2); 
     ts2.assertValues(3, 4); 
     ts2.assertCompleted(); 
    } 
} 
0

Я предполагаю, что у вас есть основной холодный наблюдаемые, что вы хотите подписаться на несколько раз, но только после того, как предыдущая подписки закончена.

Мы можем разумно использовать функцию построения delaySubscription, которая может задерживать новые подписки. Следующим препятствием является запуск подписки при завершении предыдущей подписки. Мы делаем это с помощью doOnUnsubscribe, который запускает как отмену подписки, onError, так и onCompleted действия.

public class DelaySubscribe<T> { 
    Observable<Integer> previouse = Observable.just(0); 

    private DelaySubscribe() { 

    } 

    public static <T> Observable<T> makeDelayOb(Observable<T> cold) { 
     return new DelaySubscribe<T>().obs(cold); 
    } 

    private Observable<T> obs(Observable<T> cold) { 
     return Observable.create(ob -> { 
      Observable<Integer> tmp = previouse; 
      ReplaySubject<Integer> rep = ReplaySubject.create(); 
      previouse = rep; 
      cold.delaySubscription(() -> tmp).doOnUnsubscribe(() -> { 
       rep.onNext(0); 
       rep.onCompleted(); 
      }).subscribe(ob); 
     }); 
    } 

Пример использование:

public static void main(String[] args) throws IOException, InterruptedException { 
     Observable<Long> cold = Observable.interval(1, TimeUnit.SECONDS).take(2); 
     Observable<Long> hot = makeDelayOb(cold); 

     Func1<Integer, rx.Observer<Long>> obs = (Integer i) -> Observers.create(el -> System.out.println(i + "next: " + el), 
       er -> System.out.println(i + "error: " + er),() -> System.out.println(i + "completed")); 

     System.out.println("1"); 
     Subscription s = hot.subscribe(obs.call(1)); 
     System.out.println("2"); 
     hot.subscribe(obs.call(2)); 
     Thread.sleep(1500); 
     s.unsubscribe(); 
     System.out.println("3"); 
     Thread.sleep(3500); 
     hot.subscribe(obs.call(3)); 
     System.out.println("4"); 
     System.in.read(); 
    } 
} 

Выход:

1 
2 
1next: 0 
3 
2next: 0 
2next: 1 
2completed 
4 
3next: 0 
3next: 1 
3completed 
0

Я предполагаю, что у вас есть горячий наблюдаемый с несколькими подписками, но хочу только одна подписки на получение событий. Как только текущая подписка будет отписана, следующий должен начать получать.

Что мы можем сделать, это предоставить каждую подписку и уникальный номер и сохранить список всех подписчиков. Только первая подписка в списке получает события, остальные - filter. Использование

public class SingleSubscribe { 
    List<Integer> current = Collections.synchronizedList(new ArrayList<>()); 
    int max = 0; 
    Object gate = new Object(); 

    private SingleSubscribe() { 
    } 

    public static <T> Transformer<T, T> singleSubscribe() { 
     return new SingleSubscribe().obs(); 
    } 

    private <T> Transformer<T, T> obs() { 
     return (source) -> Observable.create((Subscriber<? super T> ob) -> { 
      Integer me; 
      synchronized (gate) { 
       me = max++; 
      } 
      current.add(me); 
      source.doOnUnsubscribe(() -> current.remove(me)).filter(__ -> { 
       return current.get(0) == me; 
      }).subscribe(ob); 
     }); 
    } 

Пример:

public static void main(String[] args) throws InterruptedException, IOException { 
     ConnectableObservable<Long> connectable = Observable.interval(500, TimeUnit.MILLISECONDS) 
       .publish(); 
     Observable<Long> hot = connectable.compose(SingleSubscribe.<Long> singleSubscribe()); 
     Subscription sub = connectable.connect(); 

     Func1<Integer, rx.Observer<Long>> obs = (Integer i) -> Observers.create(el -> System.out.println(i + "next: " + el), 
       er -> { 
        System.out.println(i + "error: " + er); 
        er.printStackTrace(); 
       } ,() -> System.out.println(i + "completed")); 

     System.out.println("1"); 
     Subscription s = hot.subscribe(obs.call(1)); 
     System.out.println("2"); 
     hot.take(4).subscribe(obs.call(2)); 
     Thread.sleep(1500); 
     s.unsubscribe(); 
     System.out.println("3"); 
     Thread.sleep(500); 
     hot.take(2).subscribe(obs.call(3)); 
     System.out.println("4"); 
     System.in.read(); 
     sub.unsubscribe(); 
    } 
} 

Выход:

1 
2 
1next: 0 
1next: 1 
1next: 2 
3 
2next: 3 
4 
2next: 4 
2next: 5 
2next: 6 
2completed 
3next: 6 
3next: 7 
3completed 

Обратите внимание, есть небольшой недостаток в выходе: Из-2 отписывается сразу после того, он получает 6, но до 3 принимает 6 После 2 отписок, 3 является следующим активным наблюдателем и с радостью принимает 6.

Раствор будет задерживать doOnUnsubscribe, самый простой способ планировать действия на новый планировщик потоков:

source.doOnUnsubscribe(() -> Schedulers.newThread().createWorker().schedule(()-> current.remove(me))) 

Однако, это означает, что теперь есть небольшой шанс на следующий элемент, излучаемый игнорируется, когда 2 отменить подписку, а следующий элемент появится до активации 3. Используйте вариант, который наиболее подходит для вас.

Наконец, это решение абсолютно предполагает, что источник будет горячим. Я не уверен, что произойдет, когда вы примените этот оператор к холодному наблюдаемому, но результаты могут быть неожиданными.

 Смежные вопросы

  • Нет связанных вопросов^_^