2016-07-20 3 views
0

Цель: Чтение Кафки с искровым потокового и хранения данных в Кассандре По: разъем Спарк CASSANDRA Java 1.6 Входные данные: простой объект JSON строка { «ID»: «1», «Field1»:» value1}Java Спарк потокового Кассандре

i've класса Java для чтения из Кафка искровым потоковой обработки данных чтения, а затем сохранить его в Кассандре

здесь основной код:.

**JavaPairReceiverInputDStream**<String, String> messages = 
      KafkaUtils.createStream(ssc, 
        targetKafkaServerPort, targetTopic, topicMap); 

    **JavaDStream** list = messages.map(new Function<Tuple2<String,String>,List<Object>>(){ 
     public List<Object> call( Tuple2<String,String> tuple2){ 
      List<Object> **list**=new ArrayList<Object>(); 

      Gson gson = new Gson(); 
      MyClass myclass = gson.fromJson(tuple2._2(), MyClass.class); 
      myclass.setNewData("new_data"); 
      String jsonInString = gson.toJson(myclass); 
      list.add(jsonInString); 
      return list; 
     } 
    }); 

следующая неверный код:

**javaFunctions**(list) 
      .writerBuilder("schema", "table", mapToRow(JavaDStream.class)) 
      .saveToCassandra(); 

Поскольку «javaFunctions» метод ожидают объект JavaRDD и «список» является JavaDStream ...

я бы нужно бросить JavaDStream к JavaRDD но я не найти правильный путь. ..

Любая помощь?

ответ

0

Давайте использовать импорт статического com.datastax.spark.connector.japi.CassandraStreamingJavaUtil. * Вместо com.datastax.spark.connector.japi.CassandraJavaUtil. *

0

Ummmm не очень ... Что I' ве сделать, это использовать foreachRDD после создания dsStream:

dStream.foreachRDD(new Function<JavaRDD<MyObject>, Void>() { 
     @Override 
     public Void call(JavaRDD<MyObject> rdd) throws Exception { 
      if (rdd != null) { 
       javaFunctions(rdd) 
         .writerBuilder("schema", "table", mapToRow(MyObject.class)) 
         .saveToCassandra(); 
       logging(" --> Saved data to cassandra",1,null); 
      } 

      return null; 
     } 
    }); 

Надежда быть полезным ...