0

Я новичок в kafka и бурю. Я пытался реализовать Java-пример, который объединяет Кафку и шторм. Я нашел пример в Интернете. Я пытаюсь запустить java-программу в Eclipse IDE. Я не использую maven.Интеграция Kafka Storm с java. kafka.api.OffsetRequest.DefaultClientId() Ljava/языки/String; Ошибка

У меня есть storm-kafka-0.10.0.jar, kafka-0.6.jar, scala-library-2.10.3.jar и storm-core-0.10.0.jar как внешние банки.

Вот мой код Java.

KafkaStormSample.java

import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.topology.TopologyBuilder; 

import java.util.UUID; 

import backtype.storm.spout.SchemeAsMultiScheme; 
import storm.kafka.ZkHosts; 
import storm.kafka.BrokerHosts; 
import storm.kafka.SpoutConfig; 
import storm.kafka.KafkaSpout; 
import storm.kafka.StringScheme; 

public class KafkaStormSample { 
    public static void main(String[] args) throws Exception{ 
     Config config = new Config(); 
     config.setDebug(true); 
     config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); 
     String zkConnString = "localhost:2181"; 
     String topic = "my-first-topic"; 
     BrokerHosts hosts = new ZkHosts(zkConnString); 

     SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,  
     UUID.randomUUID().toString()); 
     kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4; 
     kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4; 
    //kafkaSpoutConfig.forceFromStart = true; 
     kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig)); 
    //builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout"); 
     builder.setBolt("word-counter", new CountBolt()).shuffleGrouping("word-spitter"); 

     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology("KafkaStormSample", config, builder.createTopology()); 

     Thread.sleep(10000); 

     cluster.shutdown(); 
    } 
} 

CountBolt.java

import java.util.Map; 
import java.util.HashMap; 

import backtype.storm.tuple.Tuple; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.IRichBolt; 
import backtype.storm.task.TopologyContext; 

public class CountBolt implements IRichBolt{ 
    Map<String, Integer> counters; 
    private OutputCollector collector; 

    @Override 
    public void prepare(Map stormConf, TopologyContext context, 
    OutputCollector collector) { 
     this.counters = new HashMap<String, Integer>(); 
     this.collector = collector; 
    } 

    @Override 
    public void execute(Tuple input) { 
     String str = input.getString(0); 

     if(!counters.containsKey(str)){ 
     counters.put(str, 1); 
     }else { 
     Integer c = counters.get(str) +1; 
     counters.put(str, c); 
     } 

     collector.ack(input); 
    } 

    @Override 
    public void cleanup() { 
     for(Map.Entry<String, Integer> entry:counters.entrySet()){ 
     System.out.println(entry.getKey()+" : " + entry.getValue()); 
     } 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 

    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     return null; 
    } 
} 

Когда я пытаюсь запустить kafkaStormSample.java я получаю ошибку ниже.

Exception in thread "main" java.lang.NoSuchMethodError: kafka.api.OffsetRequest.DefaultClientId()Ljava/lang/String; 
    at storm.kafka.KafkaConfig.<init>(KafkaConfig.java:43) 
    at storm.kafka.SpoutConfig.<init>(SpoutConfig.java:40) 
    at KafkaStormSample.main(KafkaStormSample.java:23) 

Я убедился, что у меня есть все необходимые банки. Но все же я думаю, что мне не хватает кувшина.

Любая помощь будет оценена по достоинству.

Спасибо!

ответ

0

Я мало знаю об этих системах, но он выглядит как несоответствие версии библиотеки.

Одна из библиотек (случай Storm in thids) была скомпилирована против другой версии kafka, где этот метод определен. Проверьте свои зависимости.

Является ли одна из причин, почему системы управления зависимыми полезны.

Update: Из их документации они обеспечивают этот набор на Maven:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.8.1.1</version> 
     <exclusions> 
      <exclusion> 
       <groupId>org.apache.zookeeper</groupId> 
       <artifactId>zookeeper</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>log4j</groupId> 
       <artifactId>log4j</artifactId> 
      </exclusion> 
     </exclusions> 
    </dependency> 

Кажется, ваша Кафка версия слишком стара.

 Смежные вопросы

  • Нет связанных вопросов^_^