2016-06-20 2 views
0

Есть ли способ динамически создавать топологии в трезубцах? Не могли бы вы привести примеры?Apache storm Trident - динамическое создание топологий

+0

Вы можете хранить топологии конфигурации в некоторых свойствах файла (JSON) и когда ваше развертывании топологии вы можете прочитать его из этого файла. но как только вы развернете его, вы не можете изменить его динамически –

ответ

0

Прежде всего, вы также можете знать, что создание топологий не является частью Trident. Trident - это просто API для микрообработки.

И создание новых топологий по своей сути является динамичным. Это то, что делает класс TopologyBuilder.

Итак, чтобы ответить на ваш вопрос, да, можно создать новые топологии от Trident или от простых шторок и болтов. Единственное, что вам нужно, это то, что ваша логика создания топологии должна иметь доступ к кластеру Storm (классы и другие ресурсы), которая снова по определению удовлетворена, если вы запускаете свою логику в Storm.

Последнее, что вам понадобится, это найти способ отправки вновь созданной топологии, и это то, что было сделано для класса StormSubmitter, что опять (! Surprise :)) по определениям, которые удовлетворяют вашим путям классов когда вы запускаете свою логику внутри Trident или обычного желоба/болта.

Из любопытства, почему вы планируете это делать? Каковы ваши требования?

Пример:

import java.util.Map; 

import org.apache.storm.Config; 
import org.apache.storm.StormSubmitter; 
import org.apache.storm.generated.StormTopology; 
import org.apache.storm.spout.SpoutOutputCollector; 
import org.apache.storm.task.TopologyContext; 
import org.apache.storm.topology.OutputFieldsDeclarer; 
import org.apache.storm.topology.TopologyBuilder; 
import org.apache.storm.topology.base.BaseRichSpout; 
import org.apache.storm.trident.operation.TridentCollector; 
import org.apache.storm.trident.spout.IBatchSpout; 
import org.apache.storm.tuple.Fields; 

public class DynamicTopologySpout implements IBatchSpout { 

    private static final long serialVersionUID = -3269435263455830842L; 

    @Override 
    @SuppressWarnings("rawtypes") 
    public void open(Map conf, TopologyContext context) {} 

    @Override 
    public void emitBatch(long batchId, TridentCollector collector) { 
     if (newTopologyNeeded()) { 
      TopologyBuilder builder = new TopologyBuilder(); 
      builder 
      .setSpout("spout", new BaseRichSpout() { 
       private static final long serialVersionUID = 1L; 
       @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {} 
       @Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {} 
       @Override public void nextTuple() {} 
      }, 1) 
      .setMaxSpoutPending(15) 
      .setNumTasks(1); 
      StormTopology topology = builder.createTopology(); 
      Config config = new Config(); 
      try { 
       StormSubmitter.submitTopology("dynamic-topology", config, topology); 
      } catch (Exception e) { 
       e.printStackTrace(); 
       collector.reportError(e); 
      } 
     } 
    } 

    private boolean newTopologyNeeded() { 
     // Check if topology needed ... 
     return false; 
    } 

    @Override 
    public void ack(long batchId) {} 

    @Override 
    public void close() {} 

    @Override 
    public Map<String, Object> getComponentConfiguration() { return null; } 

    @Override 
    public Fields getOutputFields() { return null; } 

}