3

я штормовая топология разработана для получения данных JSONArray от Кафки брокеров на Hortonworks,Моих KafkaSpout не потребляют сообщения от Кафки брокеров в HDP

Я не знаю, почему мой kafkaSpout не потребляет сообщения от Кафки брокеров в HDP, однако топология шторма успешно отправлена, но когда я визуализирую топологию: 0% данных было уничтожено!

topology visualisation

Это моя схема класс:

public class ClientInfosSheme implements Scheme{ 
private static final long serialVersionUID = -2990121166902741545L; 
private static final Logger LOG = Logger.getLogger(ClientInfosSheme.class); 
public String codeBanque; 
public String codeAgence; 
public String codeGuichet; 
public String devise; 
public String numCompte; 
public String codeClient; 
public String codeOperation; 
public String sensOperation; 
public String montantOperation; 
public String dateValeur; 
public String dateComptable; 
public String utilisateur; 

public static final String CODEBANQUE="codeBanque"; 
public static final String CODEAGENCE="codeAgence"; 
public static final String CODEGUICHET="codeGuichet"; 
public static final String DEVISE="devise"; 
public static final String NUMCOMPTE="numCompte"; 
public static final String CODECLIENT="codeClient"; 
public static final String CODEOPERATION="codeOperation"; 
public static final String SENSOPERATION="sensOperation"; 
public static final String MONTANTOPERATION="montantOperation"; 
public static final String DATEVALEUR="dateValeur"; 
public static final String DATECOMPTABLE="dateComptable"; 
public static final String UTILISATEUR="utilisateur"; 

public List<Object> deserialize(byte[] bytes) { 

     try{ 
      String clientInfos = new String(bytes, "UTF-8"); 
       JSONArray JSON = new JSONArray(clientInfos); 
       for(int i=0;i<JSON.length();i++) { 
        JSONObject object_clientInfos=JSON.getJSONObject(i); 
       try{  

        //Récupérations des données 

         this.codeBanque=object_clientInfos.getString("codeBanque"); 
         this.codeAgence=object_clientInfos.getString("codeAgence"); 
         this.codeGuichet=object_clientInfos.getString("codeGuichet"); 
         this.devise=object_clientInfos.getString("devise"); 
         this.numCompte=object_clientInfos.getString("numCompte"); 
         this.codeClient=object_clientInfos.getString("codeClient"); 
         this.codeOperation=object_clientInfos.getString("codeOperation"); 
         this.sensOperation=object_clientInfos.getString("sensOperation"); 
         this.montantOperation=object_clientInfos.getString("montantOperation"); 
         this.dateValeur=object_clientInfos.getString("dateValeur"); 
         this.dateComptable=object_clientInfos.getString("dateComptable"); 
         this.utilisateur=object_clientInfos.getString("utilisateur"); 

        } 
        catch(Exception e) 
           { 
            e.printStackTrace(); 
           } 


    }// End For Loop 



     } catch (JSONException e1) { 
     // TODO Auto-generated catch block 
     e1.printStackTrace(); 
    } catch (UnsupportedEncodingException e1) { 
     // TODO Auto-generated catch block 
     e1.printStackTrace(); 
    } 
     return new Values(codeBanque, codeAgence, codeGuichet, devise, numCompte, codeClient, codeOperation, sensOperation, 
       montantOperation,dateValeur, dateComptable,utilisateur); 

}// End Function deserialize 

public Fields getOutputFields() { 
     return new Fields(CODEBANQUE,CODEAGENCE,CODEGUICHET,DEVISE,NUMCOMPTE, 
       CODECLIENT,CODEOPERATION, SENSOPERATION,MONTANTOPERATION,DATEVALEUR,DATECOMPTABLE,UTILISATEUR); 
    } 


} 

и свойство файл:

#Broker host 
kafka.zookeeper.host.port=sandbox.hortonworks.com 

#Kafka topic to consume. 
kafka.topic=INFOCLIENT 

#Location in ZK for the Kafka spout to store state. 
kafka.zkRoot=/client_infos_sprout 

#Kafka Spout Executors. 
spout.thread.count=1 

Когда я использую другой потребитель данные storted в Кафках П нравится:

[{"codeBanque":"xx","codeAgence":"yy","codeGuichet":"zz","devise":"tt"..}, 
{"codeBanque":"xx1","codeAgence":"yy1","codeGuichet":"zz1","devise":"tt1"..}, 
{"codeBanque":"xx2","codeAgence":"yy2","codeGuichet":"zz2","devise":"tt2"..}] 

поэтому моя проблема Почему он не потребляет сообщения от Kafka Brokers?

Пожалуйста, мне нужна помощь

+0

Вы дважды проверяли правильность имени темы, IP/Hostname и т. Д.? Вы проверяли журналы Storm и Kafka на наличие сообщений об ошибках? –

+0

Привет @ MatthiasJ.Sax я дважды проверял, и я обнаружил, что когда я изменил '#Broker host' на:' kafka.zookeeper.host.port = 192.168.1.78: 2181' у меня возникла эта проблема: _java.lang.RuntimeException: java .lang.IllegalArgumentException: a || b || c || calculCleRib (a, b, c) не существует в backtype.storm.utils.DisruptorQueue.consumeBatchToCursor (DisruptorQueue.java:128) _ –

+0

Привет, когда я проверяю интерфейс STORM, я видел, что mssgs испускаются и передаются, но не подбрасываются! Я получил это сообщение: Количество Tuple, которые были явно провалились или были удалены до того, как acking был завершен. ожидается, что значение 0 не будет выполнено. –

ответ

1

Как вы обнаружили в журналах, ваш носик не «потребляют» сообщения, так как топология имеет ошибку и не Ack кортежи - отсюда Носик повторит их , Это работает так, как было разработано.

Как только ваша топология будет стабильной, вы увидите, что смещение увеличивается. До этого Spout отправляет сообщения в топологию, но вы не сможете наблюдать за результатами.

Не видя метод calculCleRib и как он интегрируется в вашу топологию, мы не можем помочь вам отладить этот аспект.