У меня есть два метода в очереди. Я реализовал некоторую форму противодавления, при которой наблюдаемые, созданные из методов, будут только запускать события, если пользователь запускает обратный вызов, все через наблюдаемые. Проблема в том, что я не могу заставить обработчик onCompleted в главном подписчике на drain()
. Что меня удивляет, так это то, что onNext будет срабатывать для одного и того же абонента, так почему не будет onCompleted огонь? Я думаю, что между вызовом takeUntil и тяжелым рукой $ obs.complete(), что обработчик OnCompleted в подписчике уволит ...RxJS5 => абонент onCompleted callback not firing
Queue.prototype.isEmpty = function (obs) {
if (!obs) {
// this is just a dummy observable
// I wish Rx had Rx.Observable.dummy() alongside
// Rx.Observable.empty(), but oh well
obs = Rx.Observable.of('dummy');
}
return this.init()
.flatMap(() => {
return obs; // when you call obs.next(), it should fire this chain again
})
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
return acquireLockRetry(this, obj)
})
})
.flatMap(obj => {
return findFirstLine(this)
.flatMap(l => {
return releaseLock(this, obj.id)
.map(() => {
console.log(' => LLLL1 => ', l);
return l;
});
});
})
.filter(l => {
// filter out any lines => only fire event if there is no line
return !l;
})
.map(() => {
// the queue is now empty
obs.complete(); // <<<<<<<<<< note this call
return {isEmpty: true}
});
};
Queue.prototype.drain = function (obs, opts) {
opts = opts || {};
const isConnect = opts.isConnect || false;
const delay = opts.delay || 500;
let $obs = obs.takeUntil(this.isEmpty(obs))
.flatMap(() => {
return this.init();
})
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
return acquireLockRetry(this, obj)
});
})
.flatMap(obj => {
return removeOneLine(this)
.flatMap(l => {
return releaseLock(this, obj.id)
.map(() => l);
});
});
process.nextTick(function(){
obs.next('foo foo foo');
$obs.next('bar bar bar');
$obs.complete();
});
return $obs;
};
Что является движущей силой абсолютных помешанных, что я не могу получить OnCompleted Обратный вызов к огню, когда я называю выше примерно так:
const q = new Queue();
const obs = new Rx.Subject();
q.drain(obs).subscribe(
function (v) {
console.log('end result => ', colors.yellow(util.inspect(v)));
setTimeout(function() {
// the following call serves as the callback which will fire the observables in the methods again
obs.next();
}, 100);
},
function (e) {
console.log('on error => ', e);
},
function (c) {
// this never gets called and it is driving me f*cking crazy
console.log(colors.red(' DRAIN on completed => '), c);
}
);
obs.subscribe(
function (v) {
console.log('next item that was drained => ', v);
},
function (e) {
console.log('on error => ', e);
},
function (c) {
// this gets called!
console.log(colors.red(' => obs on completed => '), c);
}
);
когда я называю выше, я просто получаю это:
next item that was drained => foo foo foo
next item that was drained => bar bar bar
=> obs on completed => undefined
причина я просто получить эти 3 линии, потому что я d о этом:
process.nextTick(function(){
obs.next('foo foo foo');
$obs.next('bar bar bar');
$obs.complete();
});
но почему не явного вызова $obs.complete();
Этот вызов:
function (c) {
// this never gets called and it is driving me f*cking crazy
console.log(colors.red(' DRAIN on completed => '), c);
}
?