2014-12-08 1 views
2

Я реализовал простую топологию Storm с одним носиком и болтом, работающим в локальном режиме кластера.nextTuple() называется бесконечным временем с использованием BaseRichSpout on Storm

по какой-либо причине nextTuple() носика называется более одного раза.

Любая идея, почему?

код:

носик:

public class CommitFeedListener extends BaseRichSpout { 
    private SpoutOutputCollector outputCollector; 
    private List<String> commits; 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("commit")); 
    } 

    @Override 
    public void open(Map configMap, 
        TopologyContext context, 
        SpoutOutputCollector outputCollector) { 
     this.outputCollector = outputCollector; 
    } 

    **//that method is invoked more than once** 
    @Override 
    public void nextTuple() { 

      outputCollector.emit(new Values("testValue")); 

    } 
} 

болт:

public class EmailExtractor extends BaseBasicBolt { 
    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("email")); 
    } 
    @Override 
    public void execute(Tuple tuple, 
         BasicOutputCollector outputCollector) { 
     String commit = tuple.getStringByField("commit"); 
     System.out.println(commit);   
    } 
} 

текущей конфигурации:

public class LocalTopologyRunner { 
    private static final int TEN_MINUTES = 600000; 
    public static void main(String[] args) { 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("commit-feed-listener", new CommitFeedListener()); 
       builder 
     .setBolt("email-extractor", new EmailExtractor()) 
       .shuffleGrouping("commit-feed-listener"); 
     Config config = new Config(); 
     config.setDebug(true); 
     StormTopology topology = builder.createTopology(); 
     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology("github-commit-count-topology", 
       config, 
       topology); 
     Utils.sleep(TEN_MINUTES); 
     cluster.killTopology("github-commit-count"); 
     cluster.shutdown(); 
    } 
} 

спасибо всем, луч.

ответ

5

nextTuple() вызывается в бесконечном контуре по дизайну. Это делается так, чтобы использовать, например, грязные проверки против внешнего ресурса (базы данных, потока, ввода-вывода и т. Д.).

Вы должны спать некоторое время, чтобы предотвратить спам CPU с backtype.storm.utils.Utils, если вы не имеете ничего общего в nextTuple()

Utils.sleep(pollIntervalInMilliseconds); 

Сторма архитектура обработки в режиме реального времени, так что действительно, правильное поведение. Проверьте некоторые образцы, чтобы узнать, как реализовать носик в соответствии с вашими потребностями.

+0

Да, но если у меня есть входной файл, например. как только я закончу читать, я не хочу, чтобы он снова обрабатывал его снова. как вы могли избежать этого? – rayman

+0

Просто спать каждый раз с высоким значением миллиса для предотвращения спама процессора после того, как вы отправили свое значение один раз. Но носик, как средство проверки в реальном времени, всегда будет вызывать метод по дизайну. – zenbeni

+0

Но это не помогает мне, если я сплю. Я хочу знать, когда я закончил читать файл, например, или список. как я могу знать, что если этот метод продолжает звонить снова и снова ..? – rayman

0

Как насчет создания какого-либо флага и его установки при необходимости?

if (completed) { 
    try { 
     Utils.sleep(pollIntervalInMilliseconds); 
    } catch (InterruptedException e) { 
     // Do nothing 
    } 
    return; 
}