2015-11-20 3 views
1

Как я могу испускать несколько потоков из того же болта в Storm Trident?Испускать несколько потоков в Storm Trident

У меня есть болт, который выполняет некоторые вычисления и на основе результата я хочу передать некоторые значения одному потоку и некоторым другим значениям в другой поток.

В шторм (не Trident), мы могли бы добиться того, что с помощью следующего:

разделения потока на несколько потоков:

@Override 
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) { 
    outputFieldsDeclarer.declareStream("type1-stream", new Fields("type1")); 
    outputFieldsDeclarer.declareStream("type2-stream", new Fields("type2")); 
    outputFieldsDeclarer.declareStream("error-stream", new Fields("error")); 
} 

Затем выделяют на основе результатов, как:

collector.emit("type1-stream", new Values("type 1 data")); 
collector.emit("type2-stream", new Values("type 2 data")); 
collector.emit("error-stream", new Values("error data")); 

Затем выполните оставшуюся работу, прослушав ожидаемый поток:

builder.setBolt("errorBolt", errorBolt).shuffleGrouping("errorBoltStream", "error-stream"); 
builder.setBolt("type1Bolt", type1Bolt).shuffleGrouping("type1BoltStream", "type1-stream"); 

Как я могу достичь такого же поведения, используя Storm Trident?

Один из вариантов - «каждый» для одного и того же потока и запускает один и тот же болт и испускает только на основе того, что я хочу передать этому потоку, или другой вариант - это испускать пару ключей и значений и фильтровать поток на основе ключа (например, type1, type2, error и т. д.) и снова создавать несколько потоков. Но ни один из них не кажется мне хорошим дизайном. Какой был бы лучший способ добиться этого?

+0

У меня такая же проблема. Существует проблема: https://issues.apache.org/jira/browse/STORM-68 –

ответ

0

AFAIK, вы не можете этого сделать. Чтобы разделить поток, вам необходимо будет сделать следующее:

// main stream 
Stream stream = topology.each(...) 

// stream 01 
Stream stream1 = stream.each(...) 

// stream 02 
Stream stream2 = stream.each(...) 

 Смежные вопросы

  • Нет связанных вопросов^_^