2017-02-21 93 views
0

У меня есть ситуация, когда я должен фильтровать точки данных в потоке на основе некоторого состояния, связанного с ссылкой на внешние данные. Я загрузил внешние данные в Dataframe (так что я получаю запрос на него с использованием интерфейса SQL). Но когда я пытался запросить Dataframe, я вижу, что мы не можем получить доступ к нему внутри функции transform (filter). (пример кода ниже)Spark Streaming: использование внешних данных во время преобразования потока

// DStream is created and temp table called 'locations' is registered 
    dStream.filter(dp => { 
      val responseDf = sqlContext.sql("select location from locations where id='001'") 
      responseDf.show() //nothing is displayed 
      // some condition evaluation using responseDf 
      true 
    }) 

Я что-то не так? Если да, то каков был бы лучший подход для загрузки внешних данных в память и запроса на этапе трансформации потока.

ответ

0

Использование SparkSession вместо SQLContext решило проблему. Код ниже,

  val sparkSession = SparkSession.builder().appName("APP").getOrCreate() 
      val df = sparkSession.createDataFrame(locationRepo.getLocationInfo, classOf[LocationVO]) 
      df.createOrReplaceTempView("locations") 

      val dStream: DStream[StreamDataPoint] = getdStream() 

      dStream.filter(dp => { 
       val sparkAppSession = SparkSession.builder().appName("APP").getOrCreate() 
       val responseDf = sparkAppSession.sql("select location from locations where id='001'") 
       responseDf.show() // this prints the results 
       // some condition evaluation using responseDf 
       true 
      })