2016-11-15 3 views
2

я написал ниже код, чтобы потреблять данные с помощью искровой Работа,Как получить данные об использовании, что поступающие от Кафки, используя искру

Есть ли что-нибудь не хватает для потокового Кафки или обработки данных, после того, как получил получить? Как я могу проверить, что данные извлекаются или нет?

// StreamingExamples.setStreamingLogLevels(); 
     SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[*]"); 
     ; 
     // Create the context with 2 seconds batch size 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); 

     Map<String, Integer> topicMap = new HashMap<>(); 
     topicMap.put("Ptopic", 1); 

     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "localhost:2181", "5", 
       topicMap); 

     /*messages.foreach(new Function<JavaRDD<String, String>, Void>() { 
      public Void call(JavaRDD<String, String> accessLogs) { 
        return null; 
       }} 
      );*/ 

     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
      @Override 
      public String call(Tuple2<String, String> tuple2) { 
       /*System.out.println(tuple2._1().toString()); 
       System.out.println(tuple2._2().toString());*/ 
       return tuple2._2(); 
      } 
     }); 

     lines.print(); 
     jssc.start(); 
     jssc.awaitTermination(); 

Вот результат только печать ..

+0

Вы проверить это: https://spark.apache.org/docs/1.6.1/streaming-kafka -integration.html? И, может быть, это хорошая идея сначала создать производителя и потребителя кафки через терминал. если он работает, вы должны начать пытаться интегрировать искру. – lidox

+0

Да. Я попробовал это. В этом коде я могу получить результаты в переменной «lines», но теперь Как получить строку извлечения, используя forEachRDD? так что я могу выполнить некоторую операцию над этой строкой –

+0

, вы можете потопить ваш RDD, используя базу данных позади. как показано на рисунке: http://spark.apache.org/docs/latest/img/streaming-arch.png после этого вы можете загружать данные и выполнять некоторые операции – lidox

ответ

1

Вы можете использовать основные функции больших данных, как карты, уменьшить, flatmap и так далее.

Update 1:

JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
     @Override 
     public String call(Tuple2<String, String> tuple2) { 
      /*System.out.println(tuple2._1().toString()); 
      System.out.println(tuple2._2().toString());*/ 
      return tuple2._2(); 
     } 
    }); 

    // TODO: make some transformation here: 
    lines = lines.map(x -> { // clean data 
     String callType = x.getCallType().replaceAll("\"", "").replaceAll("[-|,]", ""); // here some operations 
     x.setCallType(callType); 
     return x; 
    }).filter(pair -> { // filter data 
     return !isFilteredOnFire || pair.getCallType().matches("(?i).*\\bFire\\b.*"); // here so filters 
    }); 

    lines.print(); 
    jssc.start(); 
    jssc.awaitTermination(); 

Полный пример описан в this blog post

+0

Код доступен в вопросах, для переменной «lines», как это будет работать по этому коду и откуда я могу отправить данные в kafka? –

+0

, может быть, проверьте это: http://stackoverflow.com/questions/31590592/how-to-write-to-kafka-from-spark -streaming – lidox

+0

В принципе, вы хотите, чтобы метод погружал искровой поток в кафку, не так ли? – lidox