2017-02-16 11 views
-2

Я разрабатываю приложение nodejs (используя функцию экспресс), которое будет обслуживать вызовы REST от клиентов через HTTP. Один из API REST обработает запрос POST, который будет принимать данные из тела POST и опубликовать его через клиент MQTT (запущенный как часть приложения).Как синхронизировать отдельные обратные вызовы в Javascript

Отдельное приложение, которое подписано на эту тему (через посредника MQTT, с которым мы оба подключились), я опубликовал, чтобы получить сообщение и ответить, опубликовав сообщение в теме, на которую подписался клиент MQTT моей заявки (что вызовет обратный вызов для запуска в моем приложении Javascript).

Я хочу, чтобы иметь возможность вернуть сообщение MQTT, получаемое моим приложением в ответ POST, обрабатываемое моим приложением.

В обычных терминах программирования на языке C ... Существует поток, обрабатывающий API REST и отдельную обработку потоков, получающую сообщения MQTT (например, два разных сокета). Я хотел бы заблокировать поток, обрабатывающий поток обработки POST, на семафоре, пока поток MQTT не сможет получить некоторые данные, поставить его в очередь и освободить семафор, чтобы разблокировать поток обработки POST (который затем удалит сообщение и вернет его в POST-ответ).

После мастерить с различными модулями и Promise/Генераторы это не ясно мне, как заставить его работать ...

Что такое «JavaScript путь», чтобы сделать это?

TIA!

+0

Вопросы о кодексе ДОЛЖНЫ включать ваш фактический код. – jfriend00

ответ

1

Вы не блокируете узел node.js. По своей конструкции node.js управляется событиями, и все сетевые операции ввода-вывода (и большинство операций ввода-вывода) не являются блокирующими и асинхронными. Таким образом, вам нужно координировать несколько асинхронных операций и получать уведомления (с каким-то событием), когда они завершатся.

Трудно быть очень конкретным, если вы не разделили ни один из ваших кодов, но основным инструментом для координации асинхронных операций в наши дни является обещание. Таким образом, если вы сделаете каждую из своих операций async, верните обещание, которое будет разрешено, когда операция async будет завершена или будет отменена, когда операция async обнаружит ошибку, вы можете использовать Promise.all() с двумя обещаниями, и она сообщит вам, когда и асинхронная операция закончили и предложили вам оба результата.

Основная идея заключается в следующем:

let p1 = asyncOperation1(...); 
let p2 = asyncOperation2(...); 

Promise.all([p1, p2]).then(results => { 
    // both async operations are done here 
    // results[0] and results[1] contain the resolved value of each of the two promises 
}).catch(err => { 
    // process error here 
}); 

Для получения обещания от операций REST, существует целый ряд различных решений варьируется от вручную делать свой собственный обещание обертку вокруг операций, для модулей, оберните модуль запроса предлагать обещания библиотекам, таким как Bluebird, которые имеют .promisify() и .promisifyAll() методы автоматического переноса операций для вас. Чтобы предлагать более конкретную информацию о том, как получить обещания от ваших асинхронных операций, вам придется поделиться фактическим кодом.


Поскольку, похоже, вы новичок здесь, позвольте мне предложить совет по проводке. Если вы покажете нам свой код, мы ВСЕГДА можем предложить более полные и более релевантные ответы. Когда вы не показываете нам свой код, вы, по сути, просите учебник общего назначения, который намного сложнее написать, и намного сложнее убедиться, что он охватывает ваш конкретный вариант использования.

0

Не блокируйте, не включайте уникальный идентификатор в сообщение, которое вы публикуете, и в него также включается этот идентификатор.

Затем вы сохраняете объект экспресс-ответа в объекте, используя этот идентификатор в качестве ключа, поэтому, когда ответ приходит и отправляет ответ.

Вы также должны указать временную метку, чтобы вы могли регулярно проходить объекты ожидания и отвечать, если сообщение MQTT не приходит вовремя.

var onGoingRequests = {}; 
var id = 0; 

mqttClient.on('message',function(topic,message){ 
    var payload = JSON.parse(message.toString()); 
    var details = onGoingRequest[payload.id]; 
    if (details) { 
    details.response.status(200).send(details.body); 
    delete onGoingRequests[payload.id]; 
    } else { 
    //response too late 
    } 
}); 

app.post('/foo', function(req, resp) { 
    var message = { 
     id: 'foo' +id++, 
     body: req.body 
    } 
    var topic = 'request/foo'; 
    mqttClient.publish(topic, JSON.stringify(message)); 
    onGoingRequests[message.id] = { 
     response: resp, 
     timestamp: Date.now(), 
    }; 
}); 

var timeout = setInterval(function(){ 
    var now = Date.now(); 
    var keys = Object.keys(onGoingCommands); 
    for (key in keys){ 
    var waiting = onGoingCommands[keys[key]]; 
    if (waiting) { 
     var diff = now - waiting.timestamp; 
     if (diff < timeout) { 
     waiting.res.status(504).send('{"error": "timeout"}'); 
     delete onGoingCommands[keys[key]]; 
     } 
    } 
    } 
}, 500); 
1

В конце концов поняли, что я, возможно, был более думать решение ...

В обработке POST обратного вызова я епдиеий объект ответа и возврат. Позже, в обратном вызове обработки сообщения подписки MQTT, я деактивирую ожидающий объект ответа и использую его для отправки правильного ответа на ожидающую транзакцию POST.

+0

Звучит удивительно знакомо ... – hardillb