У меня есть sourceStream
, состоящий из BaseData
объектов.Как разбить поток, используя highland.js?
Я хочу разветвить этот поток на n
-отображение различных потоков, которые затем фильтруют и преобразуют каждый объект BaseData
по своему вкусу.
В конце концов, я хочу, чтобы потоки n
содержали только определенный тип, а разветвленные потоки могут различаться по длине, так как данные могут быть отброшены или добавлены в будущем.
Я думал, что я мог бы установить его с помощью этого fork
:
import * as _ from 'highland';
interface BaseData {
id: string;
data: string;
}
const sourceStream = _([
{id: 'foo', data: 'poit'},
{id: 'foo', data: 'fnord'},
{id: 'bar', data: 'narf'}]);
const partners = [
'foo',
'bar',
];
partners.forEach((partner: string) => {
const partnerStream = sourceStream.fork();
partnerStream.filter((baseData: BaseData) => {
return baseData.id === partner;
});
partnerStream.each(console.log);
});
Я ожидал теперь есть два потока, и foo
-stream содержит два элемента:
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
и bar
- для включения одного элемента:
{ id: 'bar', data: 'narf' }
Тем не менее, я получаю сообщение об ошибке:
/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
throw new Error(
^
Error: Stream already being consumed, you must either fork() or observe()
at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
at Array.forEach (native)
at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
at Module._compile (module.js:570:32)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)
Как разбить поток в несколько потоков?
Я также попытался цепочки вызовов, но я только получить обратно результат один поток в:
partners.forEach((partner: string) => {
console.log(partner);
const partnerStream = sourceStream
.fork()
.filter((item: BaseData) => {
return item.id === partner;
});
partnerStream.each((item: BaseData) => {
console.log(item);
});
});
Печать только:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
Вместо ожидаемого:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}
Возможно, это был так, что я не понял, был fork
. Согласно its doc entry:
Stream.fork() Форкс поток, что позволяет добавлять дополнительные потребитель с общим противодавлением. Поток, раздвоенный для нескольких потребителей, будет только вытащить значения из своего источника так быстро, как самый медленный потребитель может справиться с ними.
ПРИМЕЧАНИЕ. Не следует зависеть от согласованного порядка выполнения между вилками. Это преобразование гарантирует, что все вилки обработают значение foo , прежде чем кто-либо обработает вторую строку значений. Он не гарантирует порядок , в котором вилка обрабатывает foo.
СОВЕТ. Будьте осторожны с изменением значений потока в вилках (или с использованием библиотеки, которая делает это). Поскольку одно и то же значение будет передано на , каждая вилка, изменения, сделанные в одной вилке, будут видны в любой вилке, которая после него выполняет . Добавьте к этому непоследовательный порядок выполнения и вы можете получить тонкие ошибки повреждения данных.Если вам нужно изменить любые значения, вы должны сделать копию и изменить копию вместо этого.
Предупреждение об изъятии: в настоящее время можно разблокировать поток после того, как потребляет его (например, через преобразование). Это станет невозможным в следующем крупном выпуске. Если вы собираетесь разбить поток, всегда вызовите fork на нем.
Так что вместо «Как разбить поток?» мой фактический вопрос может быть следующим: как дублировать поток горных на лету в разные потоки?
Я обновил мой вопрос, как сейчас я не вижу ошибки, но я вижу только возвращаемые значения одного потока, а не оба из них. – k0pernikus