Я работаю над личным проектом, связанным с данными в реальном времени, используя Kefir (или Bacon.js, выберите ваш любимый) и добрался до точки, где мне нужно регистрировать данные в базу данных для добавления идентификатора, а затем передать объект с идентификатором по цепочке. Фактически вставка данных в базу данных (NeDB) не является проблемой, а скорее ее использованием обратных вызовов и продолжением выполнения, когда запись вставляется в базу данных и как обойти это поведение.Обработка потоков в Kefir/Bacon.js
Чрезмерно упрощенный пример:
Предположим, что у нас есть несколько устройств, демпинг анализируемые данные в автобус/бассейн:
function Position(data) {
this.id = null;
this.longitude = data.longitude;
this.latitude = data.latitude;
}
self.positionDataPool.map(function(position)) { // is this even what really needs to be done?
// unsure what to do here {
self.db.insert {
longitude: position.longitude
, latitude: position.latitude
}, function(e, newRecord) {
if(e) { ... }
, else {
position.id = newRecord._id;
return position;
}
}
//}
})
.filter(function(position) {
// the position without an id is passed here
...
});
Я подозреваю, что это неправильное или несоответствующее использование функции карты, но после того, как я попробовал несколько вещей. Любые мысли, предложения или помощь будут высоко оценены.
мое решение
После этого тонну больше читать и экспериментировать (сверх того, что я уже сделал) и вернуться в дни моей работы с потоковой обработки на ежедневной основе, я пришел с следующее решение. Хотя это может быть не самым эффективным, это решение принимает входные данные из нескольких источников, подключая несколько источников событий в пул данных (не показан). Полностью новый поток создается для выполнения одной операции над объектом/данными. Хотя расширяемость не была целью здесь, это позволяет нескольким источникам наблюдать за данными, выходящими из потока, а не сбрасыванием их прямо в фильтр. Наконец, данные, поступающие из обработанного потока, фильтруются, чтобы отображать только те результаты, которые мы хотим.
self.savedPositionDataStream = Kefir.stream(function(emitter) {
self.positionDataPool.onValue(function(val) {
self.db.insert {
longitude: position.longitude
, latitude: position.latitude
}, function(e, newRecord) {
val.id = newRecord._id;
emitter.emit(val);
}
});
});
self.filteredPositionData = savedPositionDataStream.filter(...);
Спасибо за полный пример, используя fromNodeCallback, я попытался спуститься по этой дороге пару раз и пропустил что-то в этом процессе. К счастью, Кефир тоже поддерживает от NodeCallback, я вернусь и попробую это для удовольствия и посмотрю, смогу ли я получить от него лучшую производительность. – codeape