Я предполагаю, что у вас есть горячий наблюдаемый с несколькими подписками, но хочу только одна подписки на получение событий. Как только текущая подписка будет отписана, следующий должен начать получать.
Что мы можем сделать, это предоставить каждую подписку и уникальный номер и сохранить список всех подписчиков. Только первая подписка в списке получает события, остальные - filter
. Использование
public class SingleSubscribe {
List<Integer> current = Collections.synchronizedList(new ArrayList<>());
int max = 0;
Object gate = new Object();
private SingleSubscribe() {
}
public static <T> Transformer<T, T> singleSubscribe() {
return new SingleSubscribe().obs();
}
private <T> Transformer<T, T> obs() {
return (source) -> Observable.create((Subscriber<? super T> ob) -> {
Integer me;
synchronized (gate) {
me = max++;
}
current.add(me);
source.doOnUnsubscribe(() -> current.remove(me)).filter(__ -> {
return current.get(0) == me;
}).subscribe(ob);
});
}
Пример:
public static void main(String[] args) throws InterruptedException, IOException {
ConnectableObservable<Long> connectable = Observable.interval(500, TimeUnit.MILLISECONDS)
.publish();
Observable<Long> hot = connectable.compose(SingleSubscribe.<Long> singleSubscribe());
Subscription sub = connectable.connect();
Func1<Integer, rx.Observer<Long>> obs = (Integer i) -> Observers.create(el -> System.out.println(i + "next: " + el),
er -> {
System.out.println(i + "error: " + er);
er.printStackTrace();
} ,() -> System.out.println(i + "completed"));
System.out.println("1");
Subscription s = hot.subscribe(obs.call(1));
System.out.println("2");
hot.take(4).subscribe(obs.call(2));
Thread.sleep(1500);
s.unsubscribe();
System.out.println("3");
Thread.sleep(500);
hot.take(2).subscribe(obs.call(3));
System.out.println("4");
System.in.read();
sub.unsubscribe();
}
}
Выход:
1
2
1next: 0
1next: 1
1next: 2
3
2next: 3
4
2next: 4
2next: 5
2next: 6
2completed
3next: 6
3next: 7
3completed
Обратите внимание, есть небольшой недостаток в выходе: Из-2 отписывается сразу после того, он получает 6, но до 3 принимает 6 После 2 отписок, 3 является следующим активным наблюдателем и с радостью принимает 6.
Раствор будет задерживать doOnUnsubscribe
, самый простой способ планировать действия на новый планировщик потоков:
source.doOnUnsubscribe(() -> Schedulers.newThread().createWorker().schedule(()-> current.remove(me)))
Однако, это означает, что теперь есть небольшой шанс на следующий элемент, излучаемый игнорируется, когда 2 отменить подписку, а следующий элемент появится до активации 3. Используйте вариант, который наиболее подходит для вас.
Наконец, это решение абсолютно предполагает, что источник будет горячим. Я не уверен, что произойдет, когда вы примените этот оператор к холодному наблюдаемому, но результаты могут быть неожиданными.
Это не то, что означает «горячий наблюдаемый». Наблюдается наблюдаемое значение «горячего наблюдения», которое влияет на побочные эффекты между подписками. Например, когда он публикует данные в прямом эфире. То, что вы действительно хотите, это источник, который передает наблюдаемые данные, которые дают только один раз, когда все предыдущие наблюдаемые делаются. В: Нужно ли блокировать вызов подписки или это достаточно хорошо, если второй Observable не дает 'onNext', пока предыдущий не назвал' onCompleted'? – Dorus
Также я не знаю, как построить оператор, который сделает это за вас, но я уверен, что это можно сделать с помощью существующих операторов. – Dorus
Еще одна неясная часть: вы выполняете внутренний холодный наблюдаемый, который подписывается подпиской в свою очередь, или существует ли продолжительная горячая наблюдаемая ситуация, когда следующая подписка имеет право только на сообщения, когда последняя отменяет подписку? – Dorus