2015-12-13 3 views
1

Я новичок в искрообразовании и кафке, и я не понимаю этого исключения во время выполнения. Я уже установил сервер kafka.Spark Streaming + kafka «JobGenerator» java.lang.NoSuchMethodError

Exception in thread "JobGenerator" java.lang.NoSuchMethodError: org.apache.spark.streaming.scheduler.InputInfoTracker.reportInfo(Lorg/apache/spark/streaming/Time;Lorg/apache/spark/streaming/scheduler/StreamInputInfo;)V 
at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:166) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) 
at scala.Option.orElse(Option.scala:257) 

и это мой код

public class TwitterStreaming { 
    // setup kafka : 
    public static final String ZKQuorum = "localhost:2181"; 
    public static final String ConsumerGroupID = "ingi2145-analytics"; 
    public static final String ListTopics = "newTweet"; 
    public static final String ListBrokers = "localhost:9092"; // I'm not sure about ... 

    @SuppressWarnings("deprecation") 
public static void main(String[] args) throws Exception { 
    // Location of the Spark directory 
    String sparkHome = "usr/local/spark"; 
    // URL of the Spark cluster 
    String sparkUrl = "local[4]"; 
    // Location of the required JAR files 
    String jarFile = "target/analytics-1.0.jar"; 
// Generating spark's streaming context 
JavaStreamingContext jssc = new JavaStreamingContext(
    sparkUrl, "Streaming", new Duration(1000), sparkHome, new String[]{jarFile}); 
// Start kafka stream 
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(ListTopics.split(","))); 
HashMap<String, String> kafkaParams = new HashMap<String, String>(); 
kafkaParams.put("metadata.broker.list", ListBrokers); 

//JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroupID, mapPartitionsPerTopics); 
// Create direct kafka stream with brokers and topics 
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
    jssc, 
    String.class, 
    String.class, 
    StringDecoder.class, 
    StringDecoder.class, 
    kafkaParams, 
    topicsSet 
); 

// get the json file : 
    JavaDStream<String> json = messages.map(
     new Function<Tuple2<String, String>, String>() { 
      public String call(Tuple2<String, String> tuple2) { 
       return tuple2._2(); 
      } 
    }); 

Целью данного проекта является вычисление 10 BESTS хештегов из потока твиттер с помощью очереди Кафки. Код работал без какфа. У вас есть идея, в чем проблема?

+2

Какая версия искры. У меня такая же проблема, когда я использую искру 1.4 запуска для версии 1.5 искры. Я переключаюсь на версию 1.5, и все в порядке. – giaosudau

+0

Спасибо за помощь :) – afaraut

ответ

0

У меня была такая же проблема, и это была версия искры, которую я использовал. Я использовал 1.5, затем использовал 1.4, и в конечном итоге версия, которая работала для меня, была 1.6. Итак, убедитесь, что версия Kafka, которую вы используете, совместима с версией Spark. В моем случае я использую Kafka версии 2.10-0.10.1.1 с искровым-1.6.0-bin-hadoop2.3.

Кроме того, (очень важно) убедитесь, что вы не получаете запрещенной ошибки в ваших файлах журналов. Вы должны назначить правильные разрешения безопасности папкам, используемым искру, иначе вы можете получить много ошибок, которые не имеют ничего общего с самим приложением, но с неправильной настройкой безопасности.