2017-01-31 7 views
0

Недавно мы перешли на HDP 2.5 с Kafka 0.10.0 и Spark 1.6.2. Поэтому я изменил свой pom и некоторые из API для работы с новым Kafka. Я могу запустить код, но я не вижу сообщений. Я добавил фрагмент кода ниже. Я также разместил своего пом. Я не уверен, что здесь происходит не так. Может кому-то помочь.Невозможно получить любые сообщения в Kafka 0.10.0 with Spark stream 1.6.2

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
     "SparkApp"); 
JavaStreamingContext jssc = new JavaStreamingContext(conf, 
     Durations.seconds(2));  

Map<String, Integer> topicMap = new HashMap<String, Integer>(); 
topicMap.put(this.topic, this.numThreads); 

Map<String, String> kafkaParams = new HashMap<>(); 
kafkaParams.put("metadata.broker.list", kfkBroker); 
kafkaParams.put("zookeeper.connect", zkBroker); 
kafkaParams.put("group.id", "default"); 
kafkaParams.put("fetch.message.max.bytes", "60000000");     

JavaPairReceiverInputDStream<String, String> kafkaInStream = KafkaUtils.createStream(
    jssc, 
    String.class, 
    String.class, 
    kafka.serializer.StringDecoder.class, 
    kafka.serializer.StringDecoder.class, 
    kafkaParams, 
    topicMap, 
    StorageLevel.MEMORY_AND_DISK()); 

    kafkaInStream.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() 
{ 

    /** 
    * 
    */ 
    private static final long serialVersionUID = 1L; 

    public void call(JavaPairRDD<String, String> v1) throws Exception 
    { 

     System.out.println("inside call.. JavaPairRDD size " + v1.count()); 
     for (Tuple2<String, String> test : v1.collect()) 
     { 
      this.eventMessage.setMessage(test._2); 
     } 

    } 

}); 

я получаю выход «внутренний вызов .. JavaPairRDD размер 0» всегда что указывает, что искра не читает никаких данных. Я попытался подтолкнуть некоторые данные к теме через консольный клиент. Но это не помогло.

Вот мой pom.xml (только зависимости добавлено)

<dependencies> 
    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.1.1</version> 
    </dependency> 
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 --> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.10.1.1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.10</artifactId> 
     <version>1.6.2</version> 
     <scope>provided</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>1.6.2</version> 
     <scope>provided</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.10</artifactId> 
     <version>1.6.2</version> 
     <scope>provided</scope> 
    </dependency> 

    <dependency> 
     <groupId>org.json</groupId> 
     <artifactId>json</artifactId> 
     <version>20160810</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.hadoop</groupId> 
     <artifactId>hadoop-common</artifactId> 
     <version>2.7.3</version> 
     <scope>provided</scope> 
    </dependency> 
    <dependency> 
     <groupId>com.101tec</groupId> 
     <artifactId>zkclient</artifactId> 
     <version>0.8</version> 
    </dependency> 
</dependencies> 

ответ

1

spark-streaming-kafka_2.10 работает только с Кафкой 0.8+ клиентом. Вы можете использовать клиент Kafka 0.8+ для подключения к кластеру 0.10+, но при этом потеряете некоторую производительность.

Я предлагаю вам использовать --packages, чтобы отправить заявку, чтобы избежать установки Kafka в ваших зависимостях. Например,

bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.2 ... 
+0

Спасибо. Какова рекомендуемая версия для Kafka 0.10 и искры 1.6.2? Репозиторий Maven показывает 1.6.2 для искрового потока. – AnswerSeeker

+0

Вам не нужно добавлять клиента Kafka в свой pom.xml. 'spark-streaming-kafka_2.10' уже зависит от этого. – zsxwing

+0

Спасибо за подсказку. Может быть, мой вопрос был неправильным. Какую версию искрообразования Кафку следует использовать в моем проекте для моего Kafka 0.10 и искры 1.6.2? – AnswerSeeker