2016-03-02 4 views
6

Я пытаюсь использовать RxJS для простого короткого опроса. Он должен сделать запрос один раз каждые delay секунд до местоположения path на сервере, заканчивающегося после достижения одного из двух условий: либо обратный вызов isComplete(data) возвращает значение true, либо попробовал сервер более maxTries. Вот основной код:RxJS 5.0 «do while» как механизм

newShortPoll(path, maxTries, delay, isComplete) { 
    return Observable.interval(delay) 
    .take(maxTries) 
    .flatMap((tryNumber) => http.get(path)) 
    .doWhile((data) => !isComplete(data)); 
    } 

Однако doWhile не существует в RxJS 5.0, поэтому состояние, при котором он может только попытаться серверу maxTries работы, благодаря вызову взять(), но условие isComplete не Работа. Как я могу сделать так, чтобы наблюдаемые значения next() менялись до тех пор, пока isComplete не вернет true, и в этот момент он будет следующим() этим значением и complete().

Следует отметить, что здесь takeWhile() не работает. Он не возвращает последнее значение, которое на самом деле является самым важным, так как мы знаем, что это сделано.

Спасибо!

+0

Возможно дубликат: http://stackoverflow.com/questions/36007911/rxjs-poll-until-interval-done-or-correct-data-received –

+0

Это не дублировать в том смысле, что в вопросе возникает вопрос о замене 'doWhile'. –

ответ

0

Мы можем создать функцию полезности для создания второго наблюдаемого, который испускает каждый элемент, который испускает внутренний наблюдаемый; Однако, мы будем называть OnCompleted функцию, как только наш условие:

function takeUntilInclusive(inner$, predicate) { 
    return Rx.Observable.create(observer => { 
     var subscription = inner$.subscribe(item => { 
      observer.onNext(item); 

      if (predicate(item)) { 
       observer.onCompleted(); 
      } 
     }, observer.onError, observer.onCompleted); 


     return() => { 
      subscription.dispose(); 
     } 
    }); 
} 

А вот быстрый отрывок с помощью нашего нового метода полезности:

const inner$ = Rx.Observable.range(0, 4); 
const data$ = takeUntilInclusive(inner$, (x) => x > 2); 
data$.subscribe(x => console.log(x)); 

// >> 0 
// >> 1 
// >> 2 
// >> 3 

Этот ответ базируется: RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after

+0

Как ни странно, это решение работает примерно в полтора раза. Иногда конечная партия данных делает это подписчикам, иногда просто нет. Любая идея почему? Переключение на switchMap не исправило это. –

0

Вы можете добиться этого, используя retry и first операторов.

// helper observable that can return incomplete/complete data or fail. 
 
var server = Rx.Observable.create(function (observer) { 
 
    var x = Math.random(); 
 

 
    if(x < 0.1) { 
 
    observer.next(true); 
 
    } else if (x < 0.5) { 
 
    observer.error("error"); 
 
    } else { 
 
    observer.next(false); 
 
    } 
 
    observer.complete(); 
 

 
    return function() { 
 
    }; 
 
}); 
 
    
 
function isComplete(data) { 
 
    return data; 
 
} 
 
    
 
var delay = 1000; 
 
Rx.Observable.interval(delay) 
 
    .switchMap(() => { 
 
    return server 
 
     .do((data) => { 
 
     console.log('Server returned ' + data); 
 
     },() => { 
 
     console.log('Server threw'); 
 
     }) 
 
     .retry(3); 
 
    }) 
 
    .first((data) => isComplete(data)) 
 
    .subscribe(() => { 
 
    console.log('Got completed value'); 
 
    },() => { 
 
    console.log('Got error'); 
 
    });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>