Скажем, у меня есть следующая асинхронная обратный вызов на основе «бесконечную» последовательность, которую я отменить через некоторое время:Преобразование бесконечной последовательности асинхронных обратных вызовов в наблюдаемую последовательность?
'use strict';
const timers = require('timers');
let cancelled = false;
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 10000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
function cancellableSequence(callback) {
asyncOperation((error, processTime) => {
console.log('Did stuff');
if (!cancelled) {
process.nextTick(() => { cancellableSequence(callback); });
} else {
callback(null, processTime);
}
});
}
cancellableSequence((error, lastProcessTime) => {
console.log('Cancelled');
});
timers.setTimeout(() => { cancelled = true; }, 0);
asyncOperation
будет выполнять и перезвонить по крайней мере один раз, и сообщение отмены не будет отображаться сразу , но после завершения asyncOperation
. Количество вызовов asyncOperation
зависит от внутреннего значения delayMsec
и аргумента задержки, переданного в конец setTimeout()
(попытка показать, что они являются переменными).
Я начинаю изучать RxJS5 и думаю, что можно было бы преобразовать это в наблюдаемую последовательность («oooh, подписка Observable может быть отписана() d - это выглядит аккуратно!»).
Однако мои попытки превратить cancellableSequence
в генератор ES6 (как еще сделать бесконечный?), Давая Observable.bindNodeCallback(asyncOperation)()
, привели к немедленным выходам, что в моем случае является нежелательным поведением.
Я не могу использовать Observable.delay()
или Observable.timer()
, так как у меня нет известного согласованного интервала. (The Math.random (...) в asyncOperation
была попытка показать, что я как абонент не контролировать время, и обратный вызов происходит «через некоторое неизвестное время.»)
Моя неудачная попытка:
'use strict';
const timers = require('timers');
const Rx = require('rxjs/Rx');
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 10000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation);
function* generator() {
while (true) {
console.log('Yielding...');
yield operationAsObservable();
}
}
Rx.Observable.from(generator()).take(2).mergeMap(x => x).subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
)
Какие результаты есть выход:
Yielding...
Taking 2698msec to process...
Yielding...
Taking 2240msec to process...
Process took: 2240msec
Process took: 2698msec
Complete
выход происходит сразу же. Выход Process took: xxx
возникает, когда вы ожидаете (после 2240 и 2698 мс соответственно).
(По всей видимости, причина, по которой я беспокоюсь о задержке между доходностями, заключается в том, что asyncOperation()
здесь фактически представляет собой библиотеку токенов-маркеров, ограничивающую скорость, которая контролирует скорость асинхронных обратных вызовов - реализацию, которую я хотел бы сохранить .)
Как и в сторону, я попытался заменить take(2)
с задержкой отмены, но это никогда не произошло:
const subscription = Rx.Observable.from(generator()).mergeMap(x => x).subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
)
console.log('Never gets here?');
timers.setTimeout(() => {
console.log('Cancelling...');
subscription.unsubscribe();
}, 0);
Может то, что я пытаюсь быть достигнуто с помощью подписки досрочного RxJS? (Я вижу другие подходы, например process.exec('node', ...)
, для запуска asyncOperation()
как отдельный процесс, дающий мне возможность process.kill(..)
и т. Д., Но давайте не будем туда ...).
Является ли моя первоначальная реализация на основе обратного вызова предлагаемым способом реализации отменой последовательности?
ОБНОВЛЕНО РЕШЕНИЕ:
Смотрите мой ответ на комментарий @ user3743222 ответим ниже.Вот что я закончил с (заменить ES6 генератора с Observable.expand()
):
'use strict';
const timers = require('timers');
const Rx = require('rxjs/Rx');
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 10000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation);
const subscription = Rx.Observable
.defer(operationAsObservable)
.expand(x => operationAsObservable())
.subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
);
subscription.add(() => {
console.log('Cancelled');
});
timers.setTimeout(() => {
console.log('Cancelling...');
subscription.unsubscribe();
}, 0);
ОБНОВЛЕНО РЕШЕНИЕ 2:
Вот что я придумал для альтернативного RxJS4 repeatWhen()
подхода:
'use strict';
const timers = require('timers');
const Rx = require('rx');
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 1000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
const operationAsObservable = Rx.Observable.fromNodeCallback(asyncOperation);
const subscription = Rx.Observable
.defer(operationAsObservable)
.repeatWhen(x => x.takeWhile(y => true))
.subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
);
timers.setTimeout(() => {
console.log('Cancelling...');
subscription.dispose();
}, 10000);
Что такое проблема с первым подходом? – guest271314