0

Я разрабатываю приложение для искрообразования с помощью Kafka. У меня есть несколько вопросов следующим образом: я потоковый данные из таблиц RDBMS в Кафка и используя искровую потребителя потреблять сообщения и процесс с использованием Спарка - SQLПередача данных в реальном времени с использованием Apache Spark и kfaka

Вопросы: 1. я потоковый данными из таблицы и потоковый Кафка, как (ключ как имя таблицы, значение как данные таблицы в виде записей JSON) - это правильная архитектура?

  1. В искровом потребителе я пытаюсь использовать данные с помощью DStream.foreachRDD (x => преобразование в x RDD) - у меня проблема с этим (он говорит об ошибке с преобразованием внутри преобразование не разрешено ... Я пытаюсь извлечь ключи внутри функции foreachRDD, чтобы получить имена таблиц и преобразовать значения x.values ​​с использованием функции карты для преобразования из JSON в обычную строку, а затем сохранить каждую запись в Spark-sql)

Является ли эта архитектура и дизайн для потоковой передачи базы данных ОК и как я могу решить трансформацию в рамках проблемы трансформации?

С уважением, Piyush Кансал

+0

обмена код, где у Вас есть ошибка будет полезно понять ошибку better.if возможного кода, пожалуйста, поделитесь фрагмент кода – yoga

ответ

1

У меня есть подобный потребительной случай.

Я использую Nifi для получения данных из RDBMS Views и помещается в тему Kafka. У меня есть Тема для каждого вида в Oracle Database с несколькими разделами. Используя Nifi, данные преобразуются в формат JSON и помещаются в Kafka.

Есть ли какие-либо требования для использования одной и той же темы kafka для всех табличных данных?

Ниже код будет использоваться для сохранения данных в Кассандре.

> val msg = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, topicsSet) 
> 
>  /* Process Records for each RDD */ Holder.log.info("Spark foreach Starts") 
>  val data = msg.map(_._2) 
>  data.foreachRDD(rdd =>{ 
>  if(rdd.toLocalIterator.nonEmpty) { 
>  
>  
>  val messageDfRdd = sqlContext.read.json(rdd) 
var data2=messageDfRdd .map(p => employee(p.getLong(1),p.getString(4),p.getString(0),p.getString(2),p.getString(3),p.getString(5))); 

> //code to save to Cassandra. 
>   } 
+0

я просто здание ДОУ. поэтому я не уверен, правильно ли дизайн, который я выполняю, или нет. Я просто хочу, чтобы manupulate некоторые данные в eachRDD Dstream на основе некоторых ключей. –

+0

'code' вал ССК = новый StreamingContext (подкожно, Секунды (10)) вал sqlContext = новый SQLContext (подкожно) знач записей: ReceiverInputDStream [(String, String)] = KafkaUtils.createStream [String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, Map («MozartComposerDB» -> 5), StorageLevel.DISK_ONLY) val recordsKey = records.transform (x => x.groupByKey().) val arr = sc.accumulator ("") // новый ArrayBuffer [String](); recordsKey.foreachRDD {ключ => // Фильтрация данных в записях Dstream на основе каждого recordsKey) } ssc.start() ssc.awaitTermination() } } –

+0

вы можете использовать reduceByKey или GroupByKey непосредственно. не требуется преобразование. val data = msg.groupByKey() val key = data.map (_._ 1) значения val = data.map (_._ 2) – yoga