У меня есть следующий код, основанный на примере, представленном @ a.bertucci здесь Emit objects for drawing in the UI in a regular interval using RxJava on Android, где я фиксирую наблюдаемый с помощью таймера. Когда я вызываю подписку, вызывая processDelayedItems(), код [A] в zipped Observable выполняется ровно один раз, и один элемент испускается на [B]. Я бы ожидал, что код [A] будет запускаться непрерывно после запуска и продолжит излучать элементы каждые 1500 мс, но он, очевидно, работает только один раз здесь.Почему это наблюдаемое испускает только одно значение
private static void processDelayedItems() {
Observable.zip(
Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
// [A] this code is only called once
subscriber.OnNext(o)
}
}),
Observable.timer(1500, 1500, TimeUnit.MILLISECONDS), new Func2<Object, Long, Object>() {
@Override public Object call(Object entity, Long aLong) {
return entity;
}
}
)
.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override public void call(Object entity) {
// ... and accordingly one item is emitted [B]
}
}, new Action1<Throwable>() {
@Override public void call(Throwable throwable) {
throwable.printStackTrace();
}
}, new Action0() {
@Override public void call() {
}
});
}
Может кто-нибудь увидеть проблему, которую я здесь? Нужно ли мне ссылаться на Observable из-за пределов функции, чтобы поддерживать ее в течение более продолжительного времени? Собирается ли он GC (Android)? Проблема в том, что эта функция является статической?
Каковы правила наблюдений с точки зрения их жизни? Есть ли какие-либо рекомендации о том, как следует использовать ссылки на Observables, и если они могут быть статическими вообще? В моих тестах я заметил, что это не имеет большого значения, но, возможно, здесь, когда задействован таймер.
-
Исправленный код [не работает еще]:
добавил повтор()
Observable.zip( Observable.create(new Observable.OnSubscribe<Object>() { @Override public void call(Subscriber<? super Object> subscriber) { // [A] this code is only called once subscriber.OnNext(o); subscriber.OnCompleted(); } }).repeat(Schedulers.newThread()), Observable.timer(1500, 1500, TimeUnit.MILLISECONDS), new Func2<Object, Long, Object>() { @Override public Object call(Object entity, Long aLong) { return entity; } } ) .subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Object>() { @Override public void call(Object entity) { // ... and accordingly one item is emitted [B] } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { throwable.printStackTrace(); } }, new Action0() { @Override public void call() { } });
Благодарим вас за подробный ответ. Это имеет смысл, потому что Y-комбинатор ищет наблюдаемые с обеих сторон, и если нет ничего, чтобы застегнуть молнию с одной стороны, тогда ничего не испускается. –
Я только что испытал и - не знаю почему - Observable все еще испускает только один раз. Мой исправленный код выше моего первоначального сообщения.Несмотря на то, что я не думаю, что так, пожалуйста, проверьте, добавил ли я повтор() к правильному Наблюдаемому. –
Другой вопрос: Есть ли способ достичь одинакового по-другому? Как вместо использования zip, с таймером/интервалом Наблюдаемые испускают регулярные интервалы, которые затем запускают еще один Наблюдаемый, поочередно переключаемый за таймером/интервалом Oberservable? –