2014-01-23 1 views
1

Мы используем топологию подсчета слов в локальном режиме. Когда мы используем метод закрытия, мы получаем ошибки типа «java.net.connectException», и весь процесс завершается . Когда мы используем метод cluster.killtopology, процесс не может быть полностью завершен. Хотя использование метода killtopology и shutdown иногда выполняется топология, иногда отображается ошибка «java.net.connectException», и весь процесс завершается. Не могли бы вы рассказать, в чем проблема?Разница между методом выключения и уничтожения топологии и как убить топологию

Это наш код:

Главная программа:

import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.topology.TopologyBuilder; 
import backtype.storm.tuple.Fields; 

public class TopologyMain { 
    public static void main(String[] args) throws InterruptedException { 
     //Topology definition 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("word-reader",new WordReader()); 
     builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader"); 
     builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word")); 
     //Configuration 
     Config conf = new Config(); 
     conf.put("wordsFile","E:\\words.txt"); 

     conf.setDebug(false); 
     //Topology run 
     conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); 

     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology("wordcount", conf, builder.createTopology()); 
     Thread.sleep(1000); 
     cluster.shutdown(); 
    } 
} 

Носик программа:

import java.io.BufferedReader; 
import java.io.FileNotFoundException; 
import java.io.FileReader; 
import java.io.*; 
import java.util.Map; 
import backtype.storm.spout.SpoutOutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.IRichSpout; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Values; 

public class WordReader implements IRichSpout { 
    private SpoutOutputCollector collector; 
    Map<String, Object> count; 

    private FileReader fileReader; 
    private boolean completed = false; 
    private TopologyContext context; 

    public boolean isDistributed() {return false;} 
    public void ack(Object msgId) { 
     System.out.println("OK:"+msgId); 
    } 
    public void close() {} 
    public void fail(Object msgId) { 
     System.out.println("FAIL:"+msgId); 
    } 

    /** 
    * The only thing that the methods will do It is emit each 
    * file line 
    */ 
    public void nextTuple() { 
     /** 
     * The nextuple it is called forever, so if we have been readed the file 
     * we will wait and then return 
     */ 
     if(completed){ 
      try { 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 
       //Do nothing 
      } 
      return; 
     } 
     String str; 
     //Open the reader 
     BufferedReader reader = new BufferedReader(fileReader); 
     try{ 
      //Read all lines 
      while((str = reader.readLine()) != null){ 
       /** 
        * By each line emmit a new value with the line as a their 
        */ 
       this.collector.emit(new Values(str),str); 
      } 
     }catch(Exception e){ 
      throw new RuntimeException("Error reading tuple",e); 
     }finally{ 
      completed = true; 
     } 
    } 

    /** 
    * We will create the file and get the collector object 
    */ 
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 
     try { 
      this.context = context; 
      this.fileReader = new FileReader(conf.get("wordsFile").toString()); 
     } catch (FileNotFoundException e) { 
      throw new RuntimeException("Error reading file["+conf.get("wordFile")+"]"); 
     } 
     this.collector = collector; 
    } 

    /** 
    * Declare the output field "word" 
    */ 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("str")); 
    } 

    public void deactivate(){} 
    public void activate(){} 
    public Map<String, Object> getComponentConfiguration(){return count;} 
} 

Болт нормализатор программа :::

import java.util.ArrayList; 
import java.util.List; 
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.IRichBolt; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 

public class WordNormalizer implements IRichBolt { 
    private OutputCollector collector; 
    Map<String, Object> count; 

    public void cleanup() {} 
    /** 
    * The bolt will receive the line from the 
    * words file and process it to Normalize this line 
    * 
    * The normalize will be put the words in lower case 
    * and split the line to get all words in this 
    */ 
    public void execute(Tuple input) { 
     String sentence = input.getString(0); 
     String[] words = sentence.split(" "); 
     for(String word : words){ 
      word = word.trim(); 
      if(!word.isEmpty()){ 
       word = word.toLowerCase(); 
       //Emit the word 
       List a = new ArrayList(); 
       a.add(input); 
       collector.emit(a,new Values(word)); 
      } 
     } 
     // Acknowledge the tuple 
     collector.ack(input); 
    } 

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.collector = collector; 
    } 

    /** 
    * The bolt will only emit the field "word" 
    */ 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("word")); 
    } 

    public Map<String, Object> getComponentConfiguration(){return count;} 
    } 

Болт счетчик команд ::

import java.util.HashMap; 
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.IRichBolt; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Fields; 

public class WordCounter implements IRichBolt { 
    Integer id; 
    String name; 
    Map<String, Integer> counters; 
    Map<String, Object> count; 
    private OutputCollector collector; 

    /** 
    * At the end of the spout (when the cluster is shutdown 
    * We will show the word counters 
    */ 
    @Override 
    public void cleanup() { 
     System.out.println("-- Word Counter ["+name+"-"+id+"] --"); 
     for(Map.Entry<String, Integer> entry : counters.entrySet()){ 
      System.out.println(entry.getKey()+": "+entry.getValue()); 
     } 
    } 

    /** 
    * On each word We will count 
    */ 
    @Override 
    public void execute(Tuple input) { 
     String str = input.getString(0); 
     /** 
     * If the word dosn't exist in the map we will create 
* this, if not We will add 1 
     */ 
     if(!counters.containsKey(str)){ 
      counters.put(str, 1); 
     }else{ 
      Integer c = counters.get(str) + 1; 
      counters.put(str, c); 
     } 
     //Set the tuple as Acknowledge 
     collector.ack(input); 
    } 

    /** 
    * On create 
    */ 
    @Override 
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.counters = new HashMap<String, Integer>(); 
     this.collector = collector; 
     this.name = context.getThisComponentId(); 
     this.id = context.getThisTaskId(); 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("str", "c")); 
    } 

    public void deactivate(){} 

    public Map<String, Object> getComponentConfiguration(){return count;} 

} 

Пожалуйста направьте нам решить эту проблему.

+0

топология иногда работает иногда показывает ошибку, пожалуйста, помогите нам – user3119134

ответ

1

С помощью метода cluster.killTopology("WordCount"); вы убиваете только топологию, кластер будет продолжать работу. С помощью cluster.shutdown(); вы убиваете весь кластер с запущенными топологиями в этом кластере.

Я не думаю, что убийство топологии является причиной вашей проблемы. Было бы неплохо прикрепить журнал из приложения. Попробуйте проверить это treat. Вероятно, порты, использующие шторм, беспорядочно используются и вызывают вашу проблему.

 Смежные вопросы

  • Нет связанных вопросов^_^