2017-02-09 13 views
0

Я пытаюсь сделать java искрообразование с помощью Cassandra. Я сделал то же самое с Scala, но я не знаю, как действовать на Java. Веб не дал мне никаких примеров с java spark streaming и Cassandra.Java Spark Streaming с Cassandra

Может кто-то пожалуйста, покажите мне, как есть ниже Scala код в Java:

import org.apache.spark.streaming.dstream.ConstantInputDStream 

val ssc = new StreamingContext(conf, Seconds(10)) 

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") 

val dstream = new ConstantInputDStream(ssc, cassandraRDD) 

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query, using collect to have a simple output 
    println(rdd.collect.mkString("\n")) 
} 
ssc.start() 
ssc.awaitTermination() 

Любая помощь приветствуется. Спасибо

ответ

1

В вашем преобразовании foreachRDD вы можете преобразовать свои данные в соответствии с форматом таблицы кассандры.

JavaRDD<TestBean> cassandraRDD = testRDD 
       .flatMap(new FlatMapFunction<Tuple2<String, List<Map<String, Object>>>, TestBean>() { 

        private static final long serialVersionUID = 1L; 

        @Override 
        public Iterable<TestBean> call(Tuple2<String, List<Map<String, Object>>> tuple) throws Exception { 

         return rawData; 
        } 
       }); 

      javaFunctions(jsonRDD).writerBuilder(CASSANDRA_KEYSPACE,CASSANDRA_TABLE, mapToRow(TestBean.class)).saveToCassandra(); 
+0

Это мои операторы импорта импорт статических com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; импорт статический com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; –