2016-11-03 10 views
0

Я довольно новичок в области реактивного программирования, но уже влюблен. Однако мне все еще сложно переключить мой мозг. Я стараюсь следовать всем рекомендациям как «Избегайте использования предметов» и «Избегайте нечистых функций» и, конечно, «Избегайте императивного кода».RxJS/ReactiveX Соответствующие модули связи

То, что мне трудно достичь, - это просто перекрестные коммуникации модулей, где один модуль может регистрировать «действие»/наблюдаемый, а другой может подписаться и реагировать на него. Простая шина сообщений, вероятно, будет работать, но это обеспечит использование объектов и стиля императивного кода, которые я пытаюсь избежать.

Так вот простой отправной точкой я играю с:

// some sandbox 
class Api { 
    constructor() { 
    this.actions = {}; 
    } 

    registerAction(actionName, action) { 
    // I guess this part will have to be changed 
    this.actions[actionName] = action.publishReplay(10).refCount(); 
    //this.actions[actionName].connect(); 
    } 

    getAction(actionName) { 
    return this.actions[actionName]; 
    } 
} 

const api = new Api(); 

// ------------------------------------------------------------------- 
// module 1 
let myAction = Rx.Observable.create((obs) => { 
    console.log("EXECUTING"); 
    obs.next("42 " + Date.now()); 
    obs.complete(); 
}); 

api.registerAction("myAction", myAction); 

let myTrigger = Rx.Observable.interval(1000).take(2); 

let executedAction = myTrigger 
.flatMap(x => api.getAction("myAction")) 
.subscribe(
    (x) => { console.log(`executed action: ${x}`); }, 
    (e) => {}, 
() => { console.log("completed");}); 

// ------------------------------------------------------------------- 
// module 2 
api.getAction("myAction") 
    .subscribe(
    (x) => { console.log(`SECOND executed action: ${x}`); }, 
    (e) => {}, 
() => { console.log("SECOND completed");}); 

Так в настоящее время в данный момент второй модуль при подписке он «запускает» в «MyAction» Наблюдаемый. И в сценарии реальной жизни, который может быть ajax-вызовом. Есть ли способ заставить всех подписчиков задерживать/ждать, пока «myAction» не будет вызван должным образом из модуля 1? И снова - его легко сделать, используя темы, но я пытаюсь сделать это, следуя рекомендациям.

+0

Что вы подразумеваете под * «myAction», называется надлежащим образом из модуля1 *? Вы имеете в виду, пока это не закончится или что? – martin

+0

Да. полный будет работать –

ответ

0

Итак, это гораздо более простое решение, чем тот, который я думал. Просто используя 2 наблюдаемых. Подобный эффект может быть достигнут с помощью планировщиков и subscribeOn.

// some sandbox 
class Action { 
    constructor(name, observable) { 
    this.name = name; 
    this.observable = observable; 
    this.replay = new Rx.ReplaySubject(10); 
    } 
} 

function actionFactory(action, param) { 

    return Rx.Observable.create(obs => { 
    action.observable 
    .subscribe(x => { 
     obs.next(x); 
     action.replay.next(x); 
    }, (e) => {},() => obs.complete); 
    }); 
} 

class Api { 
    constructor() { 
    this.actions = {}; 
    } 

    registerAction(actionName, action) { 
    let generatedAction = new Action(actionName, action); 

    this.actions[actionName] = generatedAction; 

    return actionFactory.bind(null, generatedAction); 
    } 

    getAction(actionName) { 
    return this.actions[actionName].replay; 
    } 
} 

const api = new Api(); 

// ------------------------------------------------------------------- 
// module 1 
let myAction = Rx.Observable.create((obs) => { 
    obs.next("42 " + Date.now()); 
    obs.complete(); 
}); 

let myRegisteredAction$ = api.registerAction("myAction", myAction); 

let myTrigger = Rx.Observable.interval(1000).take(1).delay(1000); 

let executedAction = myTrigger 
.map(x => { return { someValue: x} }) 
.concatMap(x => myRegisteredAction$(x)) 
.subscribe(
    (x) => { console.log(`MAIN: ${x}`); }, 
    (e) => { console.log("error", e)}, 
() => { console.log("MAIN: completed");}); 


// ------------------------------------------------------------------- 
// module 2 
var sub = api.getAction("myAction") 
    .subscribe(
    (x) => { console.log(`SECOND: ${x}`); }, 
    (e) => {console.log("error : " + e)}, 
() => { console.log("SECOND: completed");}); 
0

Если я правильно вас понимаю, вы хотите удостовериться, что если вы вызываете api.getAction, вы хотите, чтобы следующие значения в наблюдаемом состоянии дождались завершения вызова getAction. Перед использованием других значений.

Это то, что вы можете достичь довольно легко, используя concatMap. ConcatMap будет выполнять функцию, которая возвращает наблюдаемый (в вашем случае вызов getAction). ConcatMap будет ждать, чтобы начать обработку следующего значения, до тех пор, пока наблюдаемый, возвращаемый в функции, не завершится.

Так что если вы измените свой код, как это, он должен работать (если я правильно понял).

let executedAction = myTrigger 
.concatMap(x => api.getAction("myAction")) 
.subscribe(
    (x) => { console.log(`executed action: ${x}`); }, 
    (e) => {}, 
() => { console.log("completed");}); 

Если MyTrigger имеет новое значение, оно не будет обработано до тех пор, наблюдаемым вернулся из api.getAction завершается.

+0

Спасибо. Это улучшение, которое я рассматривал. Однако моя основная проблема заключается в том, что подписка от модуля 2 запускает излучение до того, как из модуля 1. Так что, я думаю, мне нужна какая-то задержка до тех пор, пока она не будет выпущена, а затем подписаться? Но насколько я вхожу в документацию, задержка может быть только временем (с использованием rxjs5) –

+0

Итак, вы хотите, чтобы вызов из модуля 1 завершился, прежде чем обращаться с модулем 2. Но вопрос в том, что если его вызов AJAX, вы хотите, чтобы вызов выполнялся дважды, или вы хотите, чтобы вызов второго модуля вызывал второй запрос? – KwintenP

+0

Я просто хочу, чтобы подписка с модуля 2 была только «прослушиваться» без " триггер "запрос –

 Смежные вопросы

  • Нет связанных вопросов^_^