2015-05-20 2 views
2

Я использую излучатели событий в качестве примитивов синхронизации. Например, у меня есть один класс, который спрашивает семафор как структуру в Redis. Если семафор установлен, он испускает событие. Код указан ниже:Переписать событие эмиттера с реактивным программированием (пример семафора)

var redis = require("redis"), 
    async = require('async'), 
    client = redis.createClient(), 
    assert = require('assert'), 
    EventEmitter = require('events').EventEmitter; 
    util = require('util'); 
var isMaster = false, 
    SEMAPHORE_ADDRESS = 'semaphore'; 
var SemaphoreAsker = function() { 
    var self = this; 
    var lifeCycle = function (next) { 
    client.set([SEMAPHORE_ADDRESS, true, 'NX', 'EX', 5], function(err, val) { 
     console.log('client'); 
     if(err !== null) { throw err; } 
     if(val === 'OK') { 
     self.emit('crown'); 
     } else { 
     console.log('still a minion'); 
     } 
    }); 
    }; 
    async.forever(
    function(next) { 
     setTimeout(
     lifeCycle.bind(null, next), 
     1000 
    ); 
    } 
); 
}; 
util.inherits(SemaphoreAsker, EventEmitter); 
(new SemaphoreAsker()).on('crown', function() { 
    console.log('I`m master'); 
}); 

Это работает, но выглядит немного тяжелым. Можно ли переписать пример с помощью BaconJS (RxJS/whateverRPlibrary)?

+1

Прохладный вопрос. Я отправлю ответ RxJs для сравнения, когда вернусь домой сегодня вечером. Я думаю, что это немного короче, чем ответ отправленного бекона. – Brandon

ответ

3

Следующая должны работать в RXJS:

var callback = Rx.Observable.fromNodeCallback(client.set, client); 

var source = Rx.Observable.interval(1000) 
.selectMany(function() { 
    return callback([SEMAPHORE_ADDRESS, true, 'NX', 'EX', 5]); 
}) 
.filter(function(x) { return x === 'OK'; }) 
.take(1); 


source.subscribe(function(x) { 
    console.log("I am master"); 
}); 

Если вы готовы дополнительно включать rx-node модуль, который вы также можете сохранить ваше событие эмиттер структуры с помощью

var emitter = RxNode.toEventEmitter(source, 'crown'); 

emitter.on('crown', function(){}); 
emitter.on('error', function(){}); 
emitter.on('end', function(){}); 
+1

Да, это в значительной степени то, что я собираюсь опубликовать, за исключением: 'callback = Rx.Observable.fromNodeCallback (client.set, client);' вместо использования 'fromCallback'. [fromNodeCallback] (https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/fromnodecallback.md) заботится об обработке ошибок для вас. – Brandon

+0

Приятный звонок @Brandon, я обновил пример соответственно. – paulpdaniels

2

Я использовал базовый Bacon.fromBinder для создания пользовательского потока для этого. Без рабочего примера это немного догадки, но, надеюсь, это поможет вам.

var redis = require("redis"), 
    client = redis.createClient(), 
    assert = require('assert'), 
    Bacon = require('bacon'); 

var SEMAPHORE_ADDRESS = 'semaphore'; 

var SemaphoreAsker = function() { 
    return Bacon.fromBinder(function (sink) { 
    var intervalId = setInterval(pollRedis, 1000) 

    return function unsubscribe() { 
     clearInterval(intervalId) 
    } 

    function pollRedis() { 
     client.set([SEMAPHORE_ADDRESS, true, 'NX', 'EX', 5], function(err, val) { 
     if(err !== null) { sink(new Bacon.Error(err)) } 
     else if(val === 'OK') { sink(new Bacon.Next('crown')) 
     else { assert.fail(); } 
     } 
    } 
    }) 
} 

SemaphoreAsker().take(1).onValue(function() { 
    console.log("I am master") 
}) 
+0

Спасибо за отличный ответ! Что делает (1) в этом случае? Остановляет ли он поток после первого значения? – kharandziuk

+0

Теперь, я понимаю. Существует функция отмены подписки, и она будет вызывать ее после первого значения – kharandziuk

+1

Точно, нас интересует только одно значение «ОК». После первого значения поток, созданный с помощью «Bacon.fromBinder», больше не остается абонентов, поэтому вызывается функция отмены подписки. – OlliM

2

The @ paulpdanies отвечают, но переписываются с Бэконом:

var source = Bacon.interval(1000).flatMap(function() { 
    return Bacon.fromNodeCallback(
     client, 'set', [SEMAPHORE_ADDRESS, true, 'NX', 'EX', 1] 
    ); 
}) 
.filter(function(x) { return x === 'OK'; }) 
.take(1); 

source.onValue(function(x) { 
    console.log(x); 
}); 
+1

Это выглядит отлично, вы можете использовать 'Bacon.fromNodeCallback (client, 'set', [...])', чтобы избежать явной привязки – OlliM