я написал ниже код, чтобы потреблять данные с помощью искровой Работа,Как получить данные об использовании, что поступающие от Кафки, используя искру
Есть ли что-нибудь не хватает для потокового Кафки или обработки данных, после того, как получил получить? Как я могу проверить, что данные извлекаются или нет?
// StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[*]");
;
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put("Ptopic", 1);
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "localhost:2181", "5",
topicMap);
/*messages.foreach(new Function<JavaRDD<String, String>, Void>() {
public Void call(JavaRDD<String, String> accessLogs) {
return null;
}}
);*/
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
/*System.out.println(tuple2._1().toString());
System.out.println(tuple2._2().toString());*/
return tuple2._2();
}
});
lines.print();
jssc.start();
jssc.awaitTermination();
Вот результат только печать ..
Вы проверить это: https://spark.apache.org/docs/1.6.1/streaming-kafka -integration.html? И, может быть, это хорошая идея сначала создать производителя и потребителя кафки через терминал. если он работает, вы должны начать пытаться интегрировать искру. – lidox
Да. Я попробовал это. В этом коде я могу получить результаты в переменной «lines», но теперь Как получить строку извлечения, используя forEachRDD? так что я могу выполнить некоторую операцию над этой строкой –
, вы можете потопить ваш RDD, используя базу данных позади. как показано на рисунке: http://spark.apache.org/docs/latest/img/streaming-arch.png после этого вы можете загружать данные и выполнять некоторые операции – lidox