Попытка реализовать BufferUntil
для Observable
(то есть, буферные элементы и испускают буферный список всякий раз, когда некоторые критерии удовлетворены)Невозможно реализовать BufferUntil RxJava
реализована для RxNET
Определить:
public static IObservable<IList<TSource>> BufferUntil<TSource>(
this IObservable<TSource> source,
Func<TSource, bool> predicate)
{
var published = source.Publish().RefCount();
return published.Buffer(() => published.Where(predicate));
}
Использование:
var list = new List<char> { 'a', 'b', 'c', 'd', 'e', 'c' };
list
.ToObservable()
.BufferUntil(c => c == 'c')
.Subscribe(c => c.ToList().ForEach(Console.WriteLine));
Результат:
работает отлично. Издает 2 IList
из char
: ['a','b','c']
и ['d','e','c']
Попытка реализовать точно такой же для RxJava
Определение:
public <T extends Object> Observable<List<T>> bufferUntil(
Observable<T> source,
final Func1<T, Boolean> bufferClosingCriteria) {
final Observable<T> published = source.publish().refCount();
return published.buffer(new Func0<Observable<T>>() {
@Override
public Observable<T> call() {
return published.filter(bufferClosingCriteria);
}
});
}
Использование:
Character[] arr = {'a','b','c','d','e','c'};
bufferUntil(rx.Observable.from(arr), new Func1<Character, Boolean>() {
@Override
public Boolean call(Character character) {
return character == 'c';
}
}).subscribe(new Action1<List<Character>>() {
@Override
public void call(List<Character> o) {
for (int i = 0; i < o.size(); ++i)
Log.d("LL", o.get(i).toString());
}
});
Результат:
Он излучает 3 Пустой список
Что с реализацией RxJava и/или как это исправить?