1
Я пытаюсь поглотить сообщение от производителя кафки через программу искрообразования.не может потреблять сообщение kafka через искровой поток
Вот моя программа
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
// lines.print()
lines.foreachRDD(rdd=>{
rdd.foreach(message=>
println(message))
})
выше программа работает успешно. Но я не видел, чтобы какое-либо сообщение печаталось.
Я не хочу использовать сбор. Он неэффективен. Так как я получаю от 10 до 50 к записей. –
Я протестировал цикл, который вы написали для печати, и это отлично работает для меня. Я просто хотел, чтобы вы проверили, используя collect(), если вы получите сообщение. Проверьте название своей темы. Вы пытались использовать kafka-console-consumer, если видите сообщения? – abaghel
Я вижу сообщение как у потребителя kafka, так и с помощью collect(). Что вы проверили, я его не понимаю? –