1

Я пытаюсь поглотить сообщение от производителя кафки через программу искрообразования.не может потреблять сообщение kafka через искровой поток

Вот моя программа

val Array(zkQuorum, group, topics, numThreads) = args 
     val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local") 
     val ssc = new StreamingContext(sparkConf, Seconds(5)) 

     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap 
     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 
    // lines.print() 
lines.foreachRDD(rdd=>{ 
      rdd.foreach(message=> 
     println(message)) 
    }) 

выше программа работает успешно. Но я не видел, чтобы какое-либо сообщение печаталось.

ответ

1

Установите мастер URL с помощью "local[*]"

val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]") 

Вы также можете попытаться вызвать сбор() и посмотреть, если вы получаете сообщения.

lines.foreachRDD { rdd => 
     rdd.collect().foreach(println) 
} 
+0

Я не хочу использовать сбор. Он неэффективен. Так как я получаю от 10 до 50 к записей. –

+0

Я протестировал цикл, который вы написали для печати, и это отлично работает для меня. Я просто хотел, чтобы вы проверили, используя collect(), если вы получите сообщение. Проверьте название своей темы. Вы пытались использовать kafka-console-consumer, если видите сообщения? – abaghel

+0

Я вижу сообщение как у потребителя kafka, так и с помощью collect(). Что вы проверили, я его не понимаю? –