2015-10-27 4 views
0

Я работаю над личным проектом, связанным с данными в реальном времени, используя Kefir (или Bacon.js, выберите ваш любимый) и добрался до точки, где мне нужно регистрировать данные в базу данных для добавления идентификатора, а затем передать объект с идентификатором по цепочке. Фактически вставка данных в базу данных (NeDB) не является проблемой, а скорее ее использованием обратных вызовов и продолжением выполнения, когда запись вставляется в базу данных и как обойти это поведение.Обработка потоков в Kefir/Bacon.js

Чрезмерно упрощенный пример:

Предположим, что у нас есть несколько устройств, демпинг анализируемые данные в автобус/бассейн:

function Position(data) { 
    this.id = null; 
    this.longitude = data.longitude; 
    this.latitude = data.latitude; 
} 

self.positionDataPool.map(function(position)) { // is this even what really needs to be done? 
    // unsure what to do here { 
     self.db.insert { 
      longitude: position.longitude 
      , latitude: position.latitude 
     }, function(e, newRecord) { 
      if(e) { ... } 
      , else { 
       position.id = newRecord._id; 
       return position; 
      } 
     } 
    //} 
}) 
.filter(function(position) { 
    // the position without an id is passed here 
    ... 
}); 

Я подозреваю, что это неправильное или несоответствующее использование функции карты, но после того, как я попробовал несколько вещей. Любые мысли, предложения или помощь будут высоко оценены.

мое решение

После этого тонну больше читать и экспериментировать (сверх того, что я уже сделал) и вернуться в дни моей работы с потоковой обработки на ежедневной основе, я пришел с следующее решение. Хотя это может быть не самым эффективным, это решение принимает входные данные из нескольких источников, подключая несколько источников событий в пул данных (не показан). Полностью новый поток создается для выполнения одной операции над объектом/данными. Хотя расширяемость не была целью здесь, это позволяет нескольким источникам наблюдать за данными, выходящими из потока, а не сбрасыванием их прямо в фильтр. Наконец, данные, поступающие из обработанного потока, фильтруются, чтобы отображать только те результаты, которые мы хотим.

self.savedPositionDataStream = Kefir.stream(function(emitter) { 
    self.positionDataPool.onValue(function(val) { 
     self.db.insert { 
      longitude: position.longitude 
      , latitude: position.latitude 
     }, function(e, newRecord) { 
      val.id = newRecord._id; 
      emitter.emit(val); 
     } 
    }); 
}); 

self.filteredPositionData = savedPositionDataStream.filter(...); 

ответ

2

По крайней мере, с Bacon.js, вы можете использовать Bacon.fromNodeCallback обернуть результат вставки вызова в поток. Как

Bacon.fromNodeCallback(self.db, "insert", dataToBeInserted) 

Конечно, Вы можете сделать это с Bacon.fromBinder или подобным Kefir.stream но fromNodeCallback помощника делает это проще, так как он автоматически обрабатывает значение успеха/ошибки и преобразует их в поток событий соответствующим образом.

И тогда flatMap вместо map, чтобы сделать вставку и обеспечить результаты в виде потока:

let insertionResultE = self.positionDataPool.flatMap(val => 
    Bacon.fromNodeCallback(self.db, "insert", val).map("._id") 
) 
insertionResultE.log("insertion result") 

Аналогичный подход применим к кефира, а также. Дело в том, что вы не можете делать асинхронные и, возможно, неудачные вычисления в map, но вы можете сделать это в flapMap.

Только вот, что вам нужно добавить хотя бы одного абонента для insertionResultE, чтобы активировать его. В приведенном выше примере log делает это.

+0

Спасибо за полный пример, используя fromNodeCallback, я попытался спуститься по этой дороге пару раз и пропустил что-то в этом процессе. К счастью, Кефир тоже поддерживает от NodeCallback, я вернусь и попробую это для удовольствия и посмотрю, смогу ли я получить от него лучшую производительность. – codeape