2016-12-31 12 views
0

Я использую Apache Кафка версии kafka_2.10-0.10.1.0Apache Кафка-Исключение: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe (Ljava/Util/Список;) V

Во время POC я создали простой производитель и потребитель. Когда я пытаюсь потреблять сообщение получать следующие ошибки:

Exception in thread "Thread-0" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;)V 
at com.spnotes.kafka.simple.Consumer$ConsumerThread.run(Consumer.java:59) 

Код:

package com.spnotes.kafka.simple; 

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.common.errors.WakeupException; 

import java.util.Arrays; 
import java.util.Properties; 
import java.util.Scanner; 


public class Consumer { 
    private static Scanner in; 

    public static void main(String[] argv)throws Exception{ 
     if (argv.length != 2) { 
      System.err.printf("Usage: %s <topicName> <groupId>\n", 
        Consumer.class.getSimpleName()); 
      System.exit(-1); 
     } 
     in = new Scanner(System.in); 
     String topicName = argv[0]; 
     String groupId = argv[1]; 

     ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId); 
     consumerRunnable.start(); 
     String line = ""; 
     while (!line.equals("exit")) { 
      line = in.next(); 
     } 
     consumerRunnable.getKafkaConsumer().wakeup(); 
     System.out.println("Stopping consumer ....."); 
     consumerRunnable.join(); 
    } 

    private static class ConsumerThread extends Thread{ 
     private String topicName; 
     private String groupId; 
     private KafkaConsumer<String,String> kafkaConsumer; 

     public ConsumerThread(String topicName, String groupId){ 
      this.topicName = topicName; 
      this.groupId = groupId; 
     } 
     public void run() { 
      Properties configProperties = new Properties(); 
      configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
      configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 
      configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
      configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 
      configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple"); 

      //Figure out where to start processing messages from 
      kafkaConsumer = new KafkaConsumer<String, String>(configProperties); 
      kafkaConsumer.subscribe(Arrays.asList(topicName)); 
      //Start processing messages 
      try { 
       while (true) { 
        ConsumerRecords<String, String> records = kafkaConsumer.poll(100); 
        for (ConsumerRecord<String, String> record : records) 
         System.out.println(record.value()); 
       } 
      }catch(WakeupException ex){ 
       System.out.println("Exception caught " + ex.getMessage()); 
      }finally{ 
       kafkaConsumer.close(); 
       System.out.println("After closing KafkaConsumer"); 
      } 
     } 
     public KafkaConsumer<String,String> getKafkaConsumer(){ 
      return this.kafkaConsumer; 
     } 
    } 
} 

Губит с помощью команды:

java -cp .:/home/osboxes/Kafka/kafka_2.10-0.10.1.1/libs/*:/home/osboxes/Kafka/kafka_2.10-0.10.1.1/libs/KafkaAPIClient-1.0-SNAPSHOT.jar com.spnotes.kafka.simple.Consumer test group1 

Ошибка:

Exception in thread "Thread-0" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;)V 
    at com.spnotes.kafka.simple.Consumer$ConsumerThread.run(Consumer.java:59) 
+0

KafkaConsumer.subsribe (java.utils.List) - это 0.9.0 API. Проверьте версию клиента, которую вы используете, чтобы убедиться, что используется 0,10. *. – amethystic

ответ

2

Я использовал неправильный API для amethystic. После добавления новой версии клиента он начнет работать

Старый API был в POM

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.9.0.0</version> 
    </dependency> 

Добавлен новый API в POM

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.0.0</version> 
    </dependency> 

Нет обратной совместимости между Кафка API старой и новой версии

 Смежные вопросы

  • Нет связанных вопросов^_^