2017-01-04 2 views
0

У меня есть два метода в очереди. Я реализовал некоторую форму противодавления, при которой наблюдаемые, созданные из методов, будут только запускать события, если пользователь запускает обратный вызов, все через наблюдаемые. Проблема в том, что я не могу заставить обработчик onCompleted в главном подписчике на drain(). Что меня удивляет, так это то, что onNext будет срабатывать для одного и того же абонента, так почему не будет onCompleted огонь? Я думаю, что между вызовом takeUntil и тяжелым рукой $ obs.complete(), что обработчик OnCompleted в подписчике уволит ...RxJS5 => абонент onCompleted callback not firing

Queue.prototype.isEmpty = function (obs) { 

    if (!obs) { 
     // this is just a dummy observable 
     // I wish Rx had Rx.Observable.dummy() alongside 
     // Rx.Observable.empty(), but oh well 
     obs = Rx.Observable.of('dummy'); 
    } 

    return this.init() 
     .flatMap(() => { 
      return obs; // when you call obs.next(), it should fire this chain again 
     }) 
     .flatMap(() => { 
      return acquireLock(this) 
       .flatMap(obj => { 
        return acquireLockRetry(this, obj) 
       }) 
     }) 
     .flatMap(obj => { 
      return findFirstLine(this) 
       .flatMap(l => { 
        return releaseLock(this, obj.id) 
         .map(() => { 
          console.log(' => LLLL1 => ', l); 
          return l; 
         }); 
       }); 
     }) 
     .filter(l => { 
      // filter out any lines => only fire event if there is no line 

      return !l; 
     }) 
     .map(() => { 
      // the queue is now empty 
      obs.complete(); // <<<<<<<<<< note this call 
      return {isEmpty: true} 
     }); 


}; 


Queue.prototype.drain = function (obs, opts) { 

    opts = opts || {}; 

    const isConnect = opts.isConnect || false; 
    const delay = opts.delay || 500; 

    let $obs = obs.takeUntil(this.isEmpty(obs)) 
     .flatMap(() => { 
      return this.init(); 
     }) 
     .flatMap(() => { 
      return acquireLock(this) 
       .flatMap(obj => { 
        return acquireLockRetry(this, obj) 
       }); 
     }) 
     .flatMap(obj => { 
      return removeOneLine(this) 
       .flatMap(l => { 
        return releaseLock(this, obj.id) 
         .map(() => l); 
       }); 
     }); 


    process.nextTick(function(){ 
     obs.next('foo foo foo'); 
     $obs.next('bar bar bar'); 
     $obs.complete(); 
    }); 


    return $obs; 

}; 

Что является движущей силой абсолютных помешанных, что я не могу получить OnCompleted Обратный вызов к огню, когда я называю выше примерно так:

const q = new Queue(); 

const obs = new Rx.Subject(); 

q.drain(obs).subscribe(

    function (v) { 

     console.log('end result => ', colors.yellow(util.inspect(v))); 

     setTimeout(function() { 
      // the following call serves as the callback which will fire the observables in the methods again 
      obs.next(); 
     }, 100); 

    }, 
    function (e) { 
     console.log('on error => ', e); 
    }, 
    function (c) { 
     // this never gets called and it is driving me f*cking crazy 
     console.log(colors.red(' DRAIN on completed => '), c); 
    } 

); 

obs.subscribe(
    function (v) { 
     console.log('next item that was drained => ', v); 
    }, 
    function (e) { 
     console.log('on error => ', e); 
    }, 
    function (c) { 
     // this gets called! 
     console.log(colors.red(' => obs on completed => '), c); 
    } 
); 

когда я называю выше, я просто получаю это:

next item that was drained => foo foo foo 
next item that was drained => bar bar bar 
=> obs on completed => undefined 

причина я просто получить эти 3 линии, потому что я d о этом:

process.nextTick(function(){ 
    obs.next('foo foo foo'); 
    $obs.next('bar bar bar'); 
    $obs.complete(); 
}); 

но почему не явного вызова $obs.complete(); Этот вызов:

function (c) { 
      // this never gets called and it is driving me f*cking crazy 
      console.log(colors.red(' DRAIN on completed => '), c); 
     } 

?

ответ

0

Хорошо, я думаю, что я понял это то, что сумасшедшая библиотека это RxJS

Скорее всего, чтобы сделать все правильно, то вы, вероятно, следует использовать взять() или takeUntil() или аналогичный

Я сделал это:

Queue.prototype.drain = function (obs, opts) { 

    if (!(obs instanceof Rx.Observable)) { 
     opts = obs || {}; 
     obs = new Rx.Subject(); 
    } 
    else { 
     opts = opts || {}; 
    } 


    const isConnect = opts.isConnect || false; 
    const delay = opts.delay || 500; 

    process.nextTick(function() { 
     obs.next(); 
    }); 


    let $obs = obs 
     .flatMap(() => { 
      return this.init(); 
     }) 
     .flatMap(() => { 
      return acquireLock(this) 
       .flatMap(obj => { 
        return acquireLockRetry(this, obj) 
       }); 
     }) 
     .flatMap(obj => { 
      return removeOneLine(this) 
       .flatMap(l => { 
        return releaseLock(this, obj.id) 
         .map(() => ({data: l, cb: obs.next.bind(obs)})); 
       }); 
     }) 
     // here is the key part! 
     .takeUntil(this.isEmpty(obs)); 


    return $obs; 

}; 

это, похоже, сделал трюк. Некоторое время я был довольно безнадежен. Если вы хотите получить более подробное объяснение того, как это работает, обратитесь к нам.