Цель: Чтение Кафки с искровым потокового и хранения данных в Кассандре По: разъем Спарк CASSANDRA Java 1.6 Входные данные: простой объект JSON строка { «ID»: «1», «Field1»:» value1}Java Спарк потокового Кассандре
i've класса Java для чтения из Кафка искровым потоковой обработки данных чтения, а затем сохранить его в Кассандре
здесь основной код:.
**JavaPairReceiverInputDStream**<String, String> messages =
KafkaUtils.createStream(ssc,
targetKafkaServerPort, targetTopic, topicMap);
**JavaDStream** list = messages.map(new Function<Tuple2<String,String>,List<Object>>(){
public List<Object> call( Tuple2<String,String> tuple2){
List<Object> **list**=new ArrayList<Object>();
Gson gson = new Gson();
MyClass myclass = gson.fromJson(tuple2._2(), MyClass.class);
myclass.setNewData("new_data");
String jsonInString = gson.toJson(myclass);
list.add(jsonInString);
return list;
}
});
следующая неверный код:
**javaFunctions**(list)
.writerBuilder("schema", "table", mapToRow(JavaDStream.class))
.saveToCassandra();
Поскольку «javaFunctions» метод ожидают объект JavaRDD и «список» является JavaDStream ...
я бы нужно бросить JavaDStream к JavaRDD но я не найти правильный путь. ..
Любая помощь?