1

Я пытаюсь использовать искру sql для запроса данных, поступающих из kafka, используя zeppelin для анализа тенденции в реальном времени, но без успеха.Как запросить Spark StreamingContext с искровым sql в zeppelin?

здесь простые фрагменты кода, что я бегу в дирижабле

//Load Dependency 
%dep 
    z.reset() 
    z.addRepo("Spark Packages Repo").url("http://repo1.maven.org/maven2/") 
    z.load("org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1") 
    z.load("org.apache.spark:spark-core_2.11:2.0.1") 
    z.load("org.apache.spark:spark-sql_2.11:2.0.1") 
    z.load("org.apache.spark:spark-streaming_2.11:2.0.1" 

//simple streaming 
%spark 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.sql.{Row, SQLContext} 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.kafka.KafkaUtils 
import _root_.kafka.serializer.StringDecoder 
import org.apache.spark.sql.SparkSession 

val conf = new SparkConf() 
    .setAppName("clickstream") 
    .setMaster("local[*]") 
    .set("spark.streaming.stopGracefullyOnShutdown", "true") 
    .set("spark.driver.allowMultipleContexts","true") 


val spark = SparkSession 
    .builder() 
    .appName("Spark SQL basic example") 
    .config(conf) 
    .getOrCreate() 

val ssc = new StreamingContext(conf, Seconds(1)) 

val topicsSet = Set("timer") 
val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.25.1:9091,192.168.25.1:9092,192.168.25.1:9093") 

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
            ssc, kafkaParams, topicsSet).map(_._2) 

lines.window(Seconds(60)).foreachRDD{ rdd => 
    val clickDF = spark.read.json(rdd) //doesn't have to be json 
    clickDF.createOrReplaceTempView("testjson1") 
    //olderway 
    //clickDF.registerTempTable("testjson2") 
    clickDF.show 

} 

lines.print() 
ssc.start() 
ssc.awaitTermination() 

я в состоянии напечатать каждое сообщение Кафка, но когда я запускаю простой SQL %sql select * from testjson1 // or testjson2, я получаю следующую ошибку

java.util.NoSuchElementException: None.get 
at scala.None$.get(Option.scala:347) 
at scala.None$.get(Option.scala:345) 
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343) 
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:646) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:281) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

В сообщении this post запрашивается потоковая передача данных (с примером твиттера). Поэтому я думаю, что это возможно с кафкой. Так что, наверное, может быть, я делаю что-то неправильно или пропустил какой-то момент?

Любые идеи, предложения, рекомендации приветствуются

+0

В оригинале используется rdd.toDf(). RegisterTempTable (...) вместо createOrReplaceTempView (...). Вы пробовали «старый путь»? Мне также интересно: если есть несколько RDD для обработки, последний будет доступен для выбора только после того, как каждый результат обработки rdd перезапишет предыдущие - правильно? –

+0

Я пробовал в обоих направлениях ... но никто не создает временное представление для запроса. – sagarthapa

ответ

1

Сообщение об ошибке не говорит о том, что вид температуры отсутствует. Сообщение об ошибке сообщает, что тип None не предоставляет элемент с именем «get».

С помощью искры вычисления, основанные на RDD, выполняются при вызове действия. Таким образом, до момента создания временной таблицы вычисления не выполняются. Все вычисления выполняются при выполнении запроса в таблице. Если ваша таблица не будет существовать, вы получите другое сообщение об ошибке.

Возможно, сообщения Kafka могут быть напечатаны, но ваше исключение сообщает, что экземпляр None не знает «get». Поэтому я считаю, что ваши исходные данные JSON содержат элементы без данных, и эти элементы представлены None и поэтому вызывают выполнение, когда искра выполняет вычисления.

Я предлагаю вам проверить, работает ли ваше решение в целом, путем тестирования, если оно работает с образцовыми данными, которые не содержат пустых элементов JSON.