2017-02-06 14 views
0

У меня есть sourceStream, состоящий из BaseData объектов.Как разбить поток, используя highland.js?

Я хочу разветвить этот поток на n -отображение различных потоков, которые затем фильтруют и преобразуют каждый объект BaseData по своему вкусу.

В конце концов, я хочу, чтобы потоки n содержали только определенный тип, а разветвленные потоки могут различаться по длине, так как данные могут быть отброшены или добавлены в будущем.

Я думал, что я мог бы установить его с помощью этого fork:

import * as _ from 'highland'; 

interface BaseData { 
    id: string; 
    data: string; 
} 

const sourceStream = _([ 
    {id: 'foo', data: 'poit'}, 
    {id: 'foo', data: 'fnord'}, 
    {id: 'bar', data: 'narf'}]); 

const partners = [ 
    'foo', 
    'bar', 
]; 

partners.forEach((partner: string) => { 
    const partnerStream = sourceStream.fork(); 

    partnerStream.filter((baseData: BaseData) => { 
     return baseData.id === partner; 
    }); 

    partnerStream.each(console.log); 
}); 

Я ожидал теперь есть два потока, и foo -stream содержит два элемента:

{ id: 'foo', data: 'poit' } 
{ id: 'foo', data: 'fnord' } 

и bar - для включения одного элемента:

{ id: 'bar', data: 'narf' } 

Тем не менее, я получаю сообщение об ошибке:

/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338 
     throw new Error(
     ^

Error: Stream already being consumed, you must either fork() or observe() 
    at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15) 
    at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10) 
    at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18) 
    at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19) 
    at Array.forEach (native) 
    at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10) 
    at Module._compile (module.js:570:32) 
    at Object.Module._extensions..js (module.js:579:10) 
    at Module.load (module.js:487:32) 
    at tryModuleLoad (module.js:446:12) 

Как разбить поток в несколько потоков?


Я также попытался цепочки вызовов, но я только получить обратно результат один поток в:

partners.forEach((partner: string) => { 
    console.log(partner); 
    const partnerStream = sourceStream 
     .fork() 
     .filter((item: BaseData) => { 
      return item.id === partner; 
     }); 

    partnerStream.each((item: BaseData) => { 
     console.log(item); 
    }); 
}); 

Печать только:

foo 
{ id: 'foo', data: 'poit' } 
{ id: 'foo', data: 'fnord' } 
bar 

Вместо ожидаемого:

foo 
{ id: 'foo', data: 'poit' } 
{ id: 'foo', data: 'fnord' } 
bar 
{id: 'bar', data: 'narf'} 

Возможно, это был так, что я не понял, был fork. Согласно its doc entry:

Stream.fork() Форкс поток, что позволяет добавлять дополнительные потребитель с общим противодавлением. Поток, раздвоенный для нескольких потребителей, будет только вытащить значения из своего источника так быстро, как самый медленный потребитель может справиться с ними.

ПРИМЕЧАНИЕ. Не следует зависеть от согласованного порядка выполнения между вилками. Это преобразование гарантирует, что все вилки обработают значение foo , прежде чем кто-либо обработает вторую строку значений. Он не гарантирует порядок , в котором вилка обрабатывает foo.

СОВЕТ. Будьте осторожны с изменением значений потока в вилках (или с использованием библиотеки, которая делает это). Поскольку одно и то же значение будет передано на , каждая вилка, изменения, сделанные в одной вилке, будут видны в любой вилке, которая после него выполняет . Добавьте к этому непоследовательный порядок выполнения и вы можете получить тонкие ошибки повреждения данных.Если вам нужно изменить любые значения, вы должны сделать копию и изменить копию вместо этого.

Предупреждение об изъятии: в настоящее время можно разблокировать поток после того, как потребляет его (например, через преобразование). Это станет невозможным в следующем крупном выпуске. Если вы собираетесь разбить поток, всегда вызовите fork на нем.

Так что вместо «Как разбить поток?» мой фактический вопрос может быть следующим: как дублировать поток горных на лету в разные потоки?

ответ

0

Нужно иметь в виду, что нельзя использовать разветвленный поток перед созданием всех вилок. Как если бы кто-то потреблял разветвленный поток, он и его «родитель» будут потребляться, делая любую последующую вилку раздвоенной из пустого потока.

const partnerStreams: Array<Stream<BaseData>> = []; 

partners.forEach((partner: string) => { 
    const partnerStream = sourceStream 
     .fork() 
     .filter((item: BaseData) => { 
      return item.id === partner; 
     } 
    ); 

    partnerStreams.push(partnerStream); 
}); 

partnerStreams.forEach((stream, index) => { 
    console.log(index, stream); 
    stream.toArray((foo) => { 
     console.log(index, foo); 
    }); 
}); 

Он печатает:

0 [ { id: 'foo', data: 'poit' }, { id: 'foo', data: 'fnord' } ] 
1 [ { id: 'bar', data: 'narf' } ] 
1

partnerStream.filter() возвращает новый поток. Затем вы снова потребляете partnerStream, используя partnerStream.each(), не звоните fork() или observe(). Поэтому либо цепочка partnerStream.filter().each() вызывает, либо присваивает возвращаемому значению значение partnerStream.filter() переменной и вызывается .each().

+0

Я обновил мой вопрос, как сейчас я не вижу ошибки, но я вижу только возвращаемые значения одного потока, а не оба из них. – k0pernikus

 Смежные вопросы

  • Нет связанных вопросов^_^