2017-02-16 11 views
2

Попытка реализовать BufferUntil для Observable(то есть, буферные элементы и испускают буферный список всякий раз, когда некоторые критерии удовлетворены)Невозможно реализовать BufferUntil RxJava

реализована для RxNET

Определить:

public static IObservable<IList<TSource>> BufferUntil<TSource>(
       this IObservable<TSource> source, 
       Func<TSource, bool> predicate) 
    { 
     var published = source.Publish().RefCount(); 
     return published.Buffer(() => published.Where(predicate)); 
    } 

Использование:

var list = new List<char> { 'a', 'b', 'c', 'd', 'e', 'c' }; 
list 
    .ToObservable() 
    .BufferUntil(c => c == 'c') 
    .Subscribe(c => c.ToList().ForEach(Console.WriteLine)); 

Результат:

работает отлично. Издает 2 IList из char: ['a','b','c'] и ['d','e','c']

Попытка реализовать точно такой же для RxJava

Определение:

public <T extends Object> Observable<List<T>> bufferUntil(
     Observable<T> source, 
     final Func1<T, Boolean> bufferClosingCriteria) { 

    final Observable<T> published = source.publish().refCount(); 
    return published.buffer(new Func0<Observable<T>>() { 
     @Override 
     public Observable<T> call() { 
      return published.filter(bufferClosingCriteria); 
     } 
    }); 
} 

Использование:

Character[] arr = {'a','b','c','d','e','c'}; 

bufferUntil(rx.Observable.from(arr), new Func1<Character, Boolean>() { 
    @Override 
    public Boolean call(Character character) { 
     return character == 'c'; 
    } 
}).subscribe(new Action1<List<Character>>() { 
    @Override 
    public void call(List<Character> o) { 
     for (int i = 0; i < o.size(); ++i) 
      Log.d("LL", o.get(i).toString()); 
    } 
}); 

Результат:

Он излучает 3 Пустой список

Что с реализацией RxJava и/или как это исправить?

ответ

2

Проблема заключается в том, что RxJava работает по-разному из-за синхронности по умолчанию, тогда как Rx.NET имеет асинхронные источники.

Что происходит в RxJava, так это то, что когда граница подписывается на refCount, восходящий поток начинает немедленно испускать все символы, проходит через массив символов, а граница продолжает сигнализировать всякий раз, когда видит «c». На данный момент оператор буфера даже не подписался на тот же источник refCount и никогда не видит никаких данных, поступающих от него.

В Rx.NET, насколько я понимаю, тот факт, что List.ToObservable переходит в другой поток, чтобы испускать элементы массива, что дает достаточно времени для того, чтобы исходный поток имел как буфер, так и его границу подписываться к RefCount.

Если вы добавили небольшую задержку source.delay(500, TimeUnit.MILLISECONDS).publish().refCount() вы видите данные и списки приходят через, однако, содержание списка будет все-таки отличаются:

[a, b] 
[c, d, e] 
[c] 

Причина этого заключается в том, что в RxJava, граница Observable сначала подписывается, а затем подписчик буфера, поэтому, когда приходит «c», он достигает первой границы, которая запускает разделение буфера, а затем оператор буфера получает значение «c» и сохраняет его в новом буфере.

Для того, чтобы убедиться, что оператор работает с синхронным источника, вы должны использовать publish(Func1) вместо RefCount:

public static <T extends Object> Observable<List<T>> bufferUntil(
     Observable<T> source, 
     final Func1<T, Boolean> bufferClosingCriteria) { 

    return source.publish(o -> o.buffer(() -> o.filter(bufferClosingCriteria))); 
} 

Этот publish() вариант убеждается каждый потенциальный потребитель «о» устанавливается перед источником потребляется ,

Упорядочивание подписки, чтобы убедиться, «с» заканчивается в конце буфера немного сложнее:

return source.publish(o -> { 
    PublishSubject<Object> ps = PublishSubject.create(); 
    return o.buffer(() -> ps) 
      .mergeWith(Observable.defer(() -> { 
       o.filter(bufferClosingCriteria).subscribe(ps); 
       return Observable.empty(); 
      })) 
      .filter(list -> !list.isEmpty()); 
}); 

Урожайность:

[a, b, c] 
[d, e, c] 

 Смежные вопросы

  • Нет связанных вопросов^_^