2016-12-29 2 views
3

Итак, я бы хотел избежать рекурсии с помощью наблюдаемых, используя комбинацию внешних и внутренних событий вместо вызова того же метода/функции.Избегайте рекурсии с наблюдаемыми значениями RxJS5

Сейчас у меня есть это:

Queue.prototype.deq = function (opts) { 

    opts = opts || {}; 

    const noOfLines = opts.noOfLines || opts.count || 1; 
    const isConnect = opts.isConnect !== false; 

    let $dequeue = this.init() 
     .flatMap(() => { 
      return acquireLock(this) 
       .flatMap(obj => { 
        if(obj.error){ 

        // if there is an error acquiring the lock we 
        // retry after 100 ms, which means using recursion 
        // because we call "this.deq()" again 

         return Rx.Observable.timer(100) 
          .flatMap(() => this.deq(opts)); 
        } 
        else{ 
         return makeGenericObservable() 
          .map(() => obj); 
        } 
       }) 

     }) 
     .flatMap(obj => { 
      return removeOneLine(this) 
       .map(l => ({l: l, id: obj.id})) 
     }) 
     .flatMap(obj => { 
      return releaseLock(this, obj.id) 
       .map(() => obj.l) 
     }) 
     .catch(e => { 
      console.error(e.stack || e); 
      return releaseLock(this); 
     }); 

    if (isConnect) { 
     $dequeue = $dequeue.publish(); 
     $dequeue.connect(); 
    } 

    return $dequeue; 

}; 

вместо выше, который использует рекурсию (надеюсь правильно), я хотел бы использовать более evented подход. Если есть ошибка, полученная из функции полученияLock, я бы хотел повторить каждые 100 мс, как только это удастся, я хочу остановиться, я не уверен, как это сделать, и мне сложно проверить его ... Это правда?

Queue.prototype.deq = function (opts) { 

    // .... 

    let $dequeue = this.init() 
     .flatMap(() => { 
      return acquireLock(this) 
       .flatMap(obj => { 
        if(obj.error){ 
         return Rx.Observable.interval(100) 
          .takeUntil(
           acquireLock(this) 
           .filter(obj => !obj.error) 
          ) 
        } 
        else{ 

         // this is just an "empty" observable 
         // which immediately fires onNext() 

         return makeGenericObservable() 
           .map(() => obj); 
        } 
       }) 

     }) 

    // ... 

    return $dequeue; 

}; 

Есть ли способ сделать это более кратким? Я также хотел бы повторить только 5 раз или около того. Мой основной вопрос: Как я могу создать счетчик вместе с интервалом, чтобы каждые 100 мс повторять попытку до тех пор, пока не будет получена блокировка или счет не достигнет 5?

мне нужно что-то вроде этого:

.takeUntil(this or that) 

, возможно, я просто ЦЕПЬ takeUntils, как так:

    return Rx.Observable.interval(100) 
        .takeUntil(
         acquireLock(this) 
         .filter(obj => !obj.error) 
        ) 
        .takeUntil(++count < 5); 

Я мог бы сделать это:

   return Rx.Observable.interval(100) 
        .takeUntil(
         acquireLock(this) 
         .filter(obj => !obj.error) 
        ) 
        .takeUntil(Rx.Observable.timer(500)); 

Но, вероятно, ищу что-то немного меньше kludgy

Но я не знаю, где (магазин/отслеживать) переменную count ...

ищет Кроме того, чтобы сделать это более кратким и, возможно, проверить его правильность.

Должен сказать, если это работает, это очень мощные конструкции кодирования.

ответ

1

Существует два оператора, которые могут вам помочь: retry и retryWhen. Обе повторно подписываются на источник, наблюдаемый, и таким образом повторяет неудачную операцию.

Проверить этот пример, где мы имеем наблюдаемую, что терпит неудачу на первых count подписок:

let getObs = (count) => { 
 
    return Rx.Observable.create((subs) => { 
 
    console.log('Subscription count = ', count); 
 

 
    if(count) { 
 
     count--; 
 
     subs.error("ERROR"); 
 
    } else { 
 
     subs.next("SUCCESS"); 
 
     subs.complete(); 
 
    } 
 
    
 
    return() => {}; 
 
    }); 
 
}; 
 

 
getObs(2).subscribe(console.log, console.log); 
 
getObs(2).retry(2).subscribe(console.log, console.log); 
 
getObs(3).retry(2).subscribe(console.log, console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

Как вы можете видеть:

  • Если мы называем его, как это он не удастся
  • С retry мы можем, ну, получить y это несколько раз и пуе ответ успеха
  • Если наблюдаемый сбой слишком много, то retry будет сдавать и распространять ошибку вдоль цепочки.

Что вам нужно, это retryWhen, потому что retry пытается выполнить операцию без задержки.

let getObs = (count) => { 
 
    return Rx.Observable.create((subs) => { 
 
    if(count) { 
 
     count--; 
 
     subs.error("ERROR"); 
 
    } else { 
 
     subs.next("SUCCESS"); 
 
     subs.complete(); 
 
    } 
 
    
 
    return() => {}; 
 
    }); 
 
}; 
 

 
getObs(2).retryWhen(errors => errors.delay(100)) 
 
    .subscribe(console.log, console.log); 
 
getObs(4).retryWhen(errors => errors.delay(100)) 
 
    .subscribe(console.log, console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

Легко добавить задержку с retryWhen, но заставить его потерпеть неудачу после того, как число попыток труднее:

let getObs = (count) => { 
 
    return Rx.Observable.create((subs) => { 
 
    if(count) { 
 
     count--; 
 
     subs.error("ERROR"); 
 
    } else { 
 
     subs.next("SUCCESS"); 
 
     subs.complete(); 
 
    } 
 
    
 
    return() => {}; 
 
    }); 
 
}; 
 

 
getObs(2) 
 
    .retryWhen(errors => { 
 
    return errors.delay(100).scan((errorCount, err) => { 
 
     if(!errorCount) { 
 
     throw err; 
 
     } 
 
     return --errorCount; 
 
    }, 2); 
 
    }) 
 
    .subscribe(console.log, console.log); 
 

 
getObs(4) 
 
    .retryWhen(errors => { 
 
    return errors.delay(100).scan((errorCount, err) => { 
 
     if(!errorCount) { 
 
     throw err; 
 
     } 
 
     return --errorCount; 
 
    }, 2); 
 
    }) 
 
    .subscribe(console.log, console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

И, наконец, как повторы ожидают, что ошибка будет сброшена, поэтому вам нужно сделать это, когда приобретаете блокировка:

.flatMap(() => { 
     return acquireLock(this) 
      .switchMap(obj => { 
       if(obj.error) { 
       return Rx.Observable.throw(obj.error); 
       } else { 
       Rx.Observable.of(obj); 
       } 
      }) 
      .retryWhen(...) 
    }) 

 Смежные вопросы

  • Нет связанных вопросов^_^