2016-09-01 1 views
3

Скажем, у меня есть следующая асинхронная обратный вызов на основе «бесконечную» последовательность, которую я отменить через некоторое время:Преобразование бесконечной последовательности асинхронных обратных вызовов в наблюдаемую последовательность?

'use strict'; 

const timers = require('timers'); 

let cancelled = false; 

function asyncOperation(callback) { 
    const delayMsec = Math.floor(Math.random() * 10000) + 1; 
    console.log(`Taking ${delayMsec}msec to process...`); 
    timers.setTimeout(callback, delayMsec, null, delayMsec); 
} 

function cancellableSequence(callback) { 
    asyncOperation((error, processTime) => { 
    console.log('Did stuff'); 
    if (!cancelled) { 
     process.nextTick(() => { cancellableSequence(callback); }); 
    } else { 
     callback(null, processTime); 
    } 
    }); 
} 

cancellableSequence((error, lastProcessTime) => { 
    console.log('Cancelled'); 
}); 

timers.setTimeout(() => { cancelled = true; }, 0); 

asyncOperation будет выполнять и перезвонить по крайней мере один раз, и сообщение отмены не будет отображаться сразу , но после завершения asyncOperation. Количество вызовов asyncOperation зависит от внутреннего значения delayMsec и аргумента задержки, переданного в конец setTimeout() (попытка показать, что они являются переменными).

Я начинаю изучать RxJS5 и думаю, что можно было бы преобразовать это в наблюдаемую последовательность («oooh, подписка Observable может быть отписана() d - это выглядит аккуратно!»).

Однако мои попытки превратить cancellableSequence в генератор ES6 (как еще сделать бесконечный?), Давая Observable.bindNodeCallback(asyncOperation)(), привели к немедленным выходам, что в моем случае является нежелательным поведением.

Я не могу использовать Observable.delay() или Observable.timer(), так как у меня нет известного согласованного интервала. (The Math.random (...) в asyncOperation была попытка показать, что я как абонент не контролировать время, и обратный вызов происходит «через некоторое неизвестное время.»)

Моя неудачная попытка:

'use strict'; 

const timers = require('timers'); 
const Rx = require('rxjs/Rx'); 

function asyncOperation(callback) { 
    const delayMsec = Math.floor(Math.random() * 10000) + 1; 
    console.log(`Taking ${delayMsec}msec to process...`); 
    timers.setTimeout(callback, delayMsec, null, delayMsec); 
} 

const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation); 
function* generator() { 
    while (true) { 
    console.log('Yielding...'); 
    yield operationAsObservable(); 
    } 
} 

Rx.Observable.from(generator()).take(2).mergeMap(x => x).subscribe(
    x => console.log(`Process took: ${x}msec`), 
    e => console.log(`Error: ${e}`), 
    c => console.log('Complete') 
) 

Какие результаты есть выход:

Yielding... 
Taking 2698msec to process... 
Yielding... 
Taking 2240msec to process... 
Process took: 2240msec 
Process took: 2698msec 
Complete 

выход происходит сразу же. Выход Process took: xxx возникает, когда вы ожидаете (после 2240 и 2698 мс соответственно).

(По всей видимости, причина, по которой я беспокоюсь о задержке между доходностями, заключается в том, что asyncOperation() здесь фактически представляет собой библиотеку токенов-маркеров, ограничивающую скорость, которая контролирует скорость асинхронных обратных вызовов - реализацию, которую я хотел бы сохранить .)

Как и в сторону, я попытался заменить take(2) с задержкой отмены, но это никогда не произошло:

const subscription = Rx.Observable.from(generator()).mergeMap(x => x).subscribe(
    x => console.log(`Process took: ${x}msec`), 
    e => console.log(`Error: ${e}`), 
    c => console.log('Complete') 
) 

console.log('Never gets here?'); 
timers.setTimeout(() => { 
    console.log('Cancelling...'); 
    subscription.unsubscribe(); 
}, 0); 

Может то, что я пытаюсь быть достигнуто с помощью подписки досрочного RxJS? (Я вижу другие подходы, например process.exec('node', ...), для запуска asyncOperation() как отдельный процесс, дающий мне возможность process.kill(..) и т. Д., Но давайте не будем туда ...).

Является ли моя первоначальная реализация на основе обратного вызова предлагаемым способом реализации отменой последовательности?

ОБНОВЛЕНО РЕШЕНИЕ:

Смотрите мой ответ на комментарий @ user3743222 ответим ниже.Вот что я закончил с (заменить ES6 генератора с Observable.expand()):

'use strict'; 

const timers = require('timers'); 
const Rx = require('rxjs/Rx'); 

function asyncOperation(callback) { 
    const delayMsec = Math.floor(Math.random() * 10000) + 1; 
    console.log(`Taking ${delayMsec}msec to process...`); 
    timers.setTimeout(callback, delayMsec, null, delayMsec); 
} 

const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation); 

const subscription = Rx.Observable 
    .defer(operationAsObservable) 
    .expand(x => operationAsObservable()) 
    .subscribe(
    x => console.log(`Process took: ${x}msec`), 
    e => console.log(`Error: ${e}`), 
    c => console.log('Complete') 
); 

subscription.add(() => { 
    console.log('Cancelled'); 
}); 

timers.setTimeout(() => { 
    console.log('Cancelling...'); 
    subscription.unsubscribe(); 
}, 0); 

ОБНОВЛЕНО РЕШЕНИЕ 2:

Вот что я придумал для альтернативного RxJS4 repeatWhen() подхода:

'use strict'; 

const timers = require('timers'); 
const Rx = require('rx'); 

function asyncOperation(callback) { 
    const delayMsec = Math.floor(Math.random() * 1000) + 1; 
    console.log(`Taking ${delayMsec}msec to process...`); 
    timers.setTimeout(callback, delayMsec, null, delayMsec); 
} 

const operationAsObservable = Rx.Observable.fromNodeCallback(asyncOperation); 

const subscription = Rx.Observable 
    .defer(operationAsObservable) 
    .repeatWhen(x => x.takeWhile(y => true)) 
    .subscribe(
    x => console.log(`Process took: ${x}msec`), 
    e => console.log(`Error: ${e}`), 
    c => console.log('Complete') 
); 

timers.setTimeout(() => { 
    console.log('Cancelling...'); 
    subscription.dispose(); 
}, 10000); 
+0

Что такое проблема с первым подходом? – guest271314

ответ

0

You кажется, повторяют действие каждый раз, когда он заканчивается. Это выглядит как хороший вариант использования для expand или repeatWhen.

Как правило, это было бы что-то вроде:

Rx.Observable.just(false).expand(_ => { 
    return cancelled ? Rx.Observable.empty() : Rx.Observable.fromCallback(asyncAction) 
}) 

Вы кладете cancelled истину в любой момент времени, и когда текущее действие заканчивается, он останавливает цикл. Не проверял его, поэтому мне было бы интересно узнать, работает ли это в конце.

Вы можете посмотреть на подобные вопросы о опроса:

Документация:

Ссылки на документацию предназначены для Rxjs 4, но изменений не должно быть vs v5

+0

Использование Observable.timer() или Observable.delay() (ключ к решению в вашем предоставленном URL-адресе) не было тем, что я хотел, поскольку они перемещают время на абонента/вызывающего абонента, и я имею дело с 'asyncOperation' который не может измениться (это налагает время). Однако Observable.expand() определенно дает мне то, что я хочу. Я обновляю свой пост, чтобы показать свое окончательное решение. Благодаря! –

+0

В ссылке, которую я разместил, есть два ответа. Один с 'repeatWhen', один с' expand'. Присмотритесь, вероятно, вы читаете только вопрос. Счастлив, что вы все равно решили свою проблему. – user3743222

+0

Я не следую. Я прочитал весь поток, и первый ответ с использованием 'repeatWhen()' внутренне использует 'delay()'. В контексте этого вопроса моя задержка будет происходить внутренне для выполнения метода 'action'. –

 Смежные вопросы

  • Нет связанных вопросов^_^