2013-06-04 3 views
10

Я пытаюсь написать топологию, которая выполняет следующее:Группировки в простой агрегации топологии штормовой

  1. носик, который присоединяется к Твиттеру (на основе ключевого слова)
  2. болта агрегации, что суммирует несколько твитов (скажем, N) в коллекции и отправляет им болт принтера
  3. Простой болт, который сразу выводит коллекцию на консоль.

В действительности, я хочу сделать еще немного обработки в коллекции.

Я тестировал его локально и выглядел так, как будто он работает. Тем не менее, я не уверен, правильно ли я установил группировки на болтах, и если это будет корректно работать при развертывании в реальном кластере штормов. Я был бы признателен, если кто-то сможет помочь в рассмотрении этой топологии и предложить любые ошибки, изменения или улучшения.

Спасибо.

Это то, на что похожа моя топология.

builder.setSpout("spout", new TwitterFilterSpout("pittsburgh")); 
    builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
       .shuffleGrouping("spout"); 
    builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate"); 

Aggregation Болт

public class SampleAggregatorBolt implements IRichBolt { 

    protected OutputCollector collector; 
    protected Tuple currentTuple; 
    protected Logger log; 
    /** 
    * Holds the messages in the bolt till you are ready to send them out 
    */ 
    protected List<Status> statusCache; 

    @Override 
    public void prepare(Map stormConf, TopologyContext context, 
         OutputCollector collector) { 
     this.collector = collector; 

     log = Logger.getLogger(getClass().getName()); 
     statusCache = new ArrayList<Status>(); 
    } 

    @Override 
    public void execute(Tuple tuple) { 
     currentTuple = tuple; 

     Status currentStatus = null; 
     try { 
      currentStatus = (Status) tuple.getValue(0); 
     } catch (ClassCastException e) { 
     } 
     if (currentStatus != null) { 

      //add it to the status cache 
      statusCache.add(currentStatus); 
      collector.ack(tuple); 


      //check the size of the status cache and pass it to the next stage if you have enough messages to emit 
      if (statusCache.size() > 10) { 
       collector.emit(new Values(statusCache)); 
      } 

     } 
    } 

    @Override 
    public void cleanup() { 


    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("tweets")); 

    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     return null; //To change body of implemented methods use File | Settings | File Templates. 
    } 


    protected void setupNonSerializableAttributes() { 

    } 

} 

Болт Принтер

public class PrinterBolt extends BaseBasicBolt { 

    @Override 
    public void execute(Tuple tuple, BasicOutputCollector collector) { 
     System.out.println(tuple.size() + " " + tuple); 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer ofd) { 
    } 

} 

ответ

4

Из того, что я могу видеть, что это хорошо выглядит. Тем не менее, дьявол в деталях. Я не уверен, что делает ваш агрегатор, но если он делает какие-либо предположения о передаваемых значениях, то вы должны рассмотреть соответствующую группировку полей. Это может не сделать такой большой разницы, поскольку вы используете намек на параллелизм по умолчанию 1, но если вы решите масштабировать с помощью нескольких экземпляров совокупного болта, то подразумеваемые логические допущения, которые вы делаете, могут требовать группировки без перетасовки.

+0

Я предоставил код для болта агрегатора выше (см. Метод выполнения). На данный момент он ждет, пока он накопит N (10 в приведенном выше примере) сообщениях и разложит их, как только у него будет 10 сообщений. BTW Я только что нашел ошибку, которую я исправлю. Мне нужно очистить кеш, как только я испущу значения. Итак, какие изменения должны быть необходимы, если мне нужно использовать более одного агрегатора. –

0

Привет, как только вы пытаетесь подписаться на более чем одно ключевое слово, вы столкнетесь с проблемами. Я предлагаю, чтобы ваш носик также испускал исходное ключевое слово, которое использовалось для фильтрации.

Тогда вместо того, чтобы делать shuffleGrouping я хотел бы сделать fieldsGrouping

builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
      .shuffleGrouping("spout", new Fields("keyword")); 

Таким образом, вы убедитесь, что результаты одного ключевого слова в конечном итоге на одном болте каждый раз. Таким образом, вы можете правильно вычислить агрегаты. Если вы опускаете поля Grouping Storm, можете создать экземпляр любой суммы вашего совокупного болта и отправить любые сообщения от носика в любой экземпляр совокупного болта, который в конечном итоге приведет к неправильным результатам.