1

Я пишу код, в котором я пытаюсь использовать сообщения, используя kafka и искру. Но мой код не работает. Вот мой код: Не найдено ни одного приложения для журнала (org.apache.kafka.clients.producer.ProducerConfig)

import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord } 
import org.apache.spark.streaming._ 
import org.apache.spark.SparkConf 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.SparkSession 
import java.util._ 

object Smack_Kafka_Spark extends App { 
def main(args: Array[String]) { 
val kafkaBrokers = "localhost:2181" 

val kafkaOpTopic = "test" 
/*val props = new HashMap[String, Object]() 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers) 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer") 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer")*/ 

val props = new Properties() 
props.put("bootstrap.servers", "localhost:2181") 

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 

val producer = new KafkaProducer[String, String](props) 

var spark: SparkSession = null 
val textFile: RDD[String] = spark.sparkContext.textFile("dataset.txt") 
textFile.foreach(record => { 
    val data = record.toString 
    val message = new ProducerRecord[String, String](kafkaOpTopic, null, data) 
    producer.send(message) 
}) 
producer.close() 
} 
} 

Это ошибка я получил:

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig). 
log4j:WARN Please initialize the log4j system properly. 
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 
Exception in thread "main" java.lang.NullPointerException 
at Smack_Kafka_Spark$.main(Smack_Kafka_Spark.scala:25) 
at Smack_Kafka_Spark.main(Smack_Kafka_Spark.scala) 

Я буду очень признателен за любую помощь!

ответ

2

Вы получаете NullPointerException, потому что SparkSession - null. Создайте его, как показано ниже.

val spark : SparkSession = SparkSession.builder() 
    .appName("Smack_Kafka_Spark") 
    .master("local[*]") 
    .getOrCreate() 

Теперь читайте текстовый файл, как показано ниже.

val textFile: Dataset[String] = spark.read.textFile("dataset.txt") 

Другой вопрос, вы можете столкнуться при запуске программы является

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer 

KafkaProducer не serailizable. Вам нужно будет переместить экземпляр экземпляра KafkaProducer внутри foreachPartition. Пожалуйста, проверьте SO сообщение spark kafka producer serializable

+0

Спасибо! Как вы сказали, встал вопрос о сериализации. И затем я пошел по ссылке, которую вы упомянули. – edkeveked

+0

Это код исправлен: Это код исправлено вал Textfile: РДД [String] = spark.sparkContext.textFile ("Dataset.txt") textFile.foreachPartition ((partisions: итератор [String]) => { val производитель: KafkaProducer [String, String] = новый KafkaProducer [String, String] (реквизит) partisions.foreach ((строка: String) => { try { производитель.send (новый продюсерRecord [String, String] ("тест", линия)) } поймать { случай например: Exception => { }} })}) – edkeveked

+0

Не могли бы вы сказать мне, как я могу добавить потреблять r в этом коде, чтобы получить то, что было отправлено производителем? – edkeveked

 Смежные вопросы

  • Нет связанных вопросов^_^