2017-02-16 11 views
0

Я относительно новичок в RxJava2, и у меня возникают некоторые странные поведения, поэтому вполне вероятно, что я использую инструмент не по-другому.RxJava2 flatMap создает повторяющиеся события

Это довольно большой проект, но я отделил фрагмент кода ниже как минимум воспроизводимым код:

Observable 
    .interval(333, TimeUnit.MILLISECONDS) 
    .flatMap(new Function<Long, ObservableSource<Integer>>() { 
    private Subject<Integer> s = PublishSubject.create(); 
    private int val = 0; 

    @Override public ObservableSource<Integer> apply(Long aLong) throws Exception { 
     val++; 
     s.onNext(val); 
     return s; 
     } 
    }) 
    .subscribe(new Consumer<Integer>() { 
    @Override public void accept(Integer integer) throws Exception { 
     Log.w("value: %s", integer); 
    } 
    }); 

Этот код имитирует события из моей RX-потока с использованием .interval и flatMap получать эти события " выполните некоторую обработку »и использует Subject для вывода результатов вниз по потоку.

Поток - это непрерывный процесс, который будет иметь несколько событий.

Этот минимальный код глупый, потому что я нажимаю только на обратный вызов apply, но в реальном случае есть несколько возможных моментов, когда может произойти нажатие, а число событий, принимаемых во время apply, - это не то же количество который будет отправлен через Subject.

То, что я ожидал увидеть этот код:

value: 2 // 1 got skipped because onNext is called before there's a subscriber. 
value: 3 
value: 4 
value: 5 
value: 6 ... etc 

, что я на самом деле получил это:

value: 2 
value: 3 
value: 3 // 3 twice 
value: 4 
value: 4 
value: 4 // 4 repeated 3 times 
value: 5 
value: 5 
value: 5 
value: 5 // 5 repeated 4 times 
value: 6 
value: 6 
value: 6 
value: 6 
value: 6 // 6 repeated 5 times 
... etc 

Я также попытался иметь Observable<Integer> o = s.share(); и вернуть его, или вернуться непосредственно s.share(); с теми же результатами.

Я понимаю, почему это происходит. ObservableSource снова подписались на n снова n, так что в каждом цикле больше событий.

Вопрос:

Как я могу достигнуть моего ожидаемого поведения?

(в случае, если мое ожидаемое поведение было не ясно, пожалуйста, спросите больше на комментарии)

+0

Я бы попытался переместить 'private Subject s = PublishSubject.create();' в другой области, если бы я был в вас – Blackbelt

+0

@ Blackbelt, как я уже сказал. Это минимальный воспроизводимый код. В полном коде 'Function' есть свой класс. – Budius

+0

Я могу судить о том, что вижу, и не могу догадаться, что вы написали. – Blackbelt

ответ

1

Ваш PublishSubject подписан на несколько раз, один раз за единицу из интервала().

Редактировать: Вам нужно будет перейти в новый PublishSubject каждый раз (переключитесь на BehaviorSubject, если вы хотите сохранить первую/последнюю эмиссию); перейдите к длительному процессу и убедитесь, что его onComplete называется должным образом, когда заканчивается длительный процесс.

+0

Это я уже сказал. Мой вопрос заключается в том, как добиться такого поведения, которое мне нужно? – Budius

+0

Отредактированный ответ с рекомендациями. –

+0

Кроме того, я думаю, что использование PublishSubject немного громоздко. подумайте о том, чтобы более тесно интегрировать ваш длительный процесс с Rx. –

1

Редактировать

После недавних комментариев, которые я мог придумать такого рода решения:

class MyBluetoothClient { 
    private PublishSubject<BTLEEvent> statusPublishSubject = PublishSubject.create() 

    public Observable<BTLEEvent> getEventObservable() { 
    return statusPublishSubject 
    } 

    private void publishEvent(BTLEEvent event) { 
    statusPublishSubject.onNext(event) 
    } 

    public void doStuff1() { 
    // do something that returns: 
    publishEvent(BTLEEvent.someEvent1) 
    } 

    public void doStuff2() { 
    // do something else that eventually yields 
    publishEvent(BTLEEvent.someEvent2) 
    } 
} 

И использовать его таким образом:

MyBluetoothClient client = MyBluetoothClient() 
client 
    .getEventObservable() 
    .subscribe(/* */) 

/// 

client.doStuff1() 

/// 

client.doStuff2 

Оригинальный ответ

Будет ли это делать?

Observable 
    .interval(333, TimeUnit.MILLISECONDS) 
    .flatMap(new Function<Long, ObservableSource<Integer>>() { 
    private int val = 0; 

    @Override public ObservableSource<Integer> apply(Long aLong) throws Exception { 
     val++; 
     return Observable.just(val); 
     } 
    }) 
    .subscribe(new Consumer<Integer>() { 
    @Override public void accept(Integer integer) throws Exception { 
     Log.w("value: %s", integer); 
    } 
    }); 
+0

Спасибо за ваш ответ. Для этого очень маленького примера кода, который я написал, это будет. Но если вы прочитаете полный вопрос, объект может запускать новые события в любое время на основе других условий. Мое первое утро утром в офисе я попробую сделать предложение Tassos об использовании switchMap вместо – Budius

+0

Я вижу. Мне трудно придумать лучший ответ, который вы даете так же мало, как и вы. Просто обратите внимание, что вы можете вернуть любой вид Observable, например Observable.range (0, 5) .map (...). Take (4) ... и так далее. Дайте мне что-то еще, и я постараюсь найти лучший ответ;) – ULazdins

+0

Более конкретные варианты использования у меня есть (из нескольких других): прослушивание bluetooth LE сканирует, анализирует рамку рекламы (карту), фильтрует ожидаемые кадры, и (по этой проблеме) отделите показания на событие «enter», «update rssi» и используйте таймаут для генерации события «exit». Таким образом, класс сохраняет временную метку каждого события по мере их поступления на Карту и каждую секунду выполняет проверку на выходы «выхода». – Budius

0

Так вот ответ, который я придумал. Я буду отмечать @Tassos ответ так же правильно, как он указал мне на правильный путь.

Сначала мне нужен CachedSubject (предмет, который кэширует предметы, пока нет наблюдателей, и отправляет их, как только подключается наблюдатель), это необходимо, чтобы убедиться, что выбросы внутри apply действительно проходят. Класс в основном обертывает PublishSubject.

class CachedSubject<T> extends Subject<T> { 

     private PublishSubject<T> publishSubject = PublishSubject.create(); 
     private Queue<T> cache = new ConcurrentLinkedQueue<>(); 

     @Override public boolean hasObservers() { 
      return publishSubject.hasObservers(); 
     } 

     @Override public boolean hasThrowable() { 
      return publishSubject.hasThrowable(); 
     } 

     @Override public boolean hasComplete() { 
      return publishSubject.hasComplete(); 
     } 

     @Override public Throwable getThrowable() { 
      return publishSubject.getThrowable(); 
     } 

     @Override protected void subscribeActual(Observer<? super T> observer) { 
      while (cache.size() > 0) { 
       observer.onNext(cache.remove()); 
      } 
      publishSubject.subscribeActual(observer); 
     } 

     @Override public void onSubscribe(Disposable d) { 
      publishSubject.onSubscribe(d); 
     } 

     @Override public void onNext(T t) { 
      if (hasObservers()) { 
       publishSubject.onNext(t); 
      } else { 
       cache.add(t); 
      } 
     } 

     @Override public void onError(Throwable e) { 
      publishSubject.onError(e); 
     } 

     @Override public void onComplete() { 
      publishSubject.onComplete(); 
     } 
    } 

то я использую этот класс с switchMap:

Observable 
    .interval(1000, TimeUnit.MILLISECONDS) 
    .switchMap(new Function<Long, ObservableSource<Integer>>() { 

     private Subject<Integer> s = new CachedSubject<>(); 
     private int val = 0; 

     @Override public ObservableSource<Integer> apply(Long aLong) throws Exception { 
     val++; 
     s.onNext(val); 
     return s; 
     } 
    }) 
    .subscribe(new Consumer<Integer>() { 
     @Override public void accept(Integer integer) throws Exception { 
     Log.w("value: %s", integer); 
     } 
    }); 

Это фактически позволяет мне получить любое количество событий по методу apply<T t> и только 1 Consumer подписались на него, получая все события от него.

 Смежные вопросы

  • Нет связанных вопросов^_^