Мне нужно создать очередь, где, когда элемент добавляется в очередь, абонент должен быть уведомлен. Единственное решение для моего мозга - использовать цикл while для опроса очереди или использовать просмотр очереди, чтобы сравнить, добавляется ли новый объект в очередь, если новый элемент найден, а затем использовать onNext() для пересылки элемента подписчику. Это был бы бесконечный цикл, и именно это мне нужно наблюдать за очередью.Создание реактивной очереди или структуры данных с использованием RxJava
ответ
Вы можете подклассифицировать одну из очередей и использовать методы add
/offer
/etc ..., которые вызывают Observable
.
public class SubscriberQueue<E> extends LinkedList<E> {
private Observable<E> mObservable = PublishSubscriber.create();
@Override
public boolean add(E e) {
if (super.add()) {
mObservable.onNext(e);
}
}
public Observable<E> getObservable() {return mObservable;}
}
// Test code
SubscriberQueue<Integer> myQueue = new SubscriberQueue<>();
myQueue.getObservable().subscribe(i -> System.out.println("Added: " + i));
myQueue.add(1);
Вы рекомендуете запускать объект, когда новый объект добавляется в SubscriberQueue. I.e, мне нужно сначала добавить элемент в структуру данных, а затем отправить элемент подписчику. – user64287
Код выше ничего не достигает. Мы должны сделать функцию наблюдаемой для подписки на нее. Я не вижу, как это работает с вышеуказанной функцией. – user64287
OK @ user64287, я заполнил недостающие пробелы, которые, как я думаю, отключили вас в приведенном выше примере. Проясняет ли это? – jenglert
Можете ли вы предоставить реализацию очереди? –
Его требование. Вышеупомянутая теория - это просто теория, которая может использовать peek(), чтобы получить элемент и сравнить его с предыдущим написанным элементом. – user64287
Вы знаете о 'PublishSubject'? Когда вы говорите бесконечный цикл, вы подразумеваете, что он блокирует? Это не похоже на реактивный стиль. –