я штормовая топология разработана для получения данных JSONArray от Кафки брокеров на Hortonworks,Моих KafkaSpout не потребляют сообщения от Кафки брокеров в HDP
Я не знаю, почему мой kafkaSpout не потребляет сообщения от Кафки брокеров в HDP, однако топология шторма успешно отправлена, но когда я визуализирую топологию: 0% данных было уничтожено!
Это моя схема класс:
public class ClientInfosSheme implements Scheme{
private static final long serialVersionUID = -2990121166902741545L;
private static final Logger LOG = Logger.getLogger(ClientInfosSheme.class);
public String codeBanque;
public String codeAgence;
public String codeGuichet;
public String devise;
public String numCompte;
public String codeClient;
public String codeOperation;
public String sensOperation;
public String montantOperation;
public String dateValeur;
public String dateComptable;
public String utilisateur;
public static final String CODEBANQUE="codeBanque";
public static final String CODEAGENCE="codeAgence";
public static final String CODEGUICHET="codeGuichet";
public static final String DEVISE="devise";
public static final String NUMCOMPTE="numCompte";
public static final String CODECLIENT="codeClient";
public static final String CODEOPERATION="codeOperation";
public static final String SENSOPERATION="sensOperation";
public static final String MONTANTOPERATION="montantOperation";
public static final String DATEVALEUR="dateValeur";
public static final String DATECOMPTABLE="dateComptable";
public static final String UTILISATEUR="utilisateur";
public List<Object> deserialize(byte[] bytes) {
try{
String clientInfos = new String(bytes, "UTF-8");
JSONArray JSON = new JSONArray(clientInfos);
for(int i=0;i<JSON.length();i++) {
JSONObject object_clientInfos=JSON.getJSONObject(i);
try{
//Récupérations des données
this.codeBanque=object_clientInfos.getString("codeBanque");
this.codeAgence=object_clientInfos.getString("codeAgence");
this.codeGuichet=object_clientInfos.getString("codeGuichet");
this.devise=object_clientInfos.getString("devise");
this.numCompte=object_clientInfos.getString("numCompte");
this.codeClient=object_clientInfos.getString("codeClient");
this.codeOperation=object_clientInfos.getString("codeOperation");
this.sensOperation=object_clientInfos.getString("sensOperation");
this.montantOperation=object_clientInfos.getString("montantOperation");
this.dateValeur=object_clientInfos.getString("dateValeur");
this.dateComptable=object_clientInfos.getString("dateComptable");
this.utilisateur=object_clientInfos.getString("utilisateur");
}
catch(Exception e)
{
e.printStackTrace();
}
}// End For Loop
} catch (JSONException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (UnsupportedEncodingException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
return new Values(codeBanque, codeAgence, codeGuichet, devise, numCompte, codeClient, codeOperation, sensOperation,
montantOperation,dateValeur, dateComptable,utilisateur);
}// End Function deserialize
public Fields getOutputFields() {
return new Fields(CODEBANQUE,CODEAGENCE,CODEGUICHET,DEVISE,NUMCOMPTE,
CODECLIENT,CODEOPERATION, SENSOPERATION,MONTANTOPERATION,DATEVALEUR,DATECOMPTABLE,UTILISATEUR);
}
}
и свойство файл:
#Broker host
kafka.zookeeper.host.port=sandbox.hortonworks.com
#Kafka topic to consume.
kafka.topic=INFOCLIENT
#Location in ZK for the Kafka spout to store state.
kafka.zkRoot=/client_infos_sprout
#Kafka Spout Executors.
spout.thread.count=1
Когда я использую другой потребитель данные storted в Кафках П нравится:
[{"codeBanque":"xx","codeAgence":"yy","codeGuichet":"zz","devise":"tt"..},
{"codeBanque":"xx1","codeAgence":"yy1","codeGuichet":"zz1","devise":"tt1"..},
{"codeBanque":"xx2","codeAgence":"yy2","codeGuichet":"zz2","devise":"tt2"..}]
поэтому моя проблема Почему он не потребляет сообщения от Kafka Brokers?
Пожалуйста, мне нужна помощь
Вы дважды проверяли правильность имени темы, IP/Hostname и т. Д.? Вы проверяли журналы Storm и Kafka на наличие сообщений об ошибках? –
Привет @ MatthiasJ.Sax я дважды проверял, и я обнаружил, что когда я изменил '#Broker host' на:' kafka.zookeeper.host.port = 192.168.1.78: 2181' у меня возникла эта проблема: _java.lang.RuntimeException: java .lang.IllegalArgumentException: a || b || c || calculCleRib (a, b, c) не существует в backtype.storm.utils.DisruptorQueue.consumeBatchToCursor (DisruptorQueue.java:128) _ –
Привет, когда я проверяю интерфейс STORM, я видел, что mssgs испускаются и передаются, но не подбрасываются! Я получил это сообщение: Количество Tuple, которые были явно провалились или были удалены до того, как acking был завершен. ожидается, что значение 0 не будет выполнено. –