У меня есть ситуация, когда я должен фильтровать точки данных в потоке на основе некоторого состояния, связанного с ссылкой на внешние данные. Я загрузил внешние данные в Dataframe (так что я получаю запрос на него с использованием интерфейса SQL). Но когда я пытался запросить Dataframe, я вижу, что мы не можем получить доступ к нему внутри функции transform (filter). (пример кода ниже)Spark Streaming: использование внешних данных во время преобразования потока
// DStream is created and temp table called 'locations' is registered
dStream.filter(dp => {
val responseDf = sqlContext.sql("select location from locations where id='001'")
responseDf.show() //nothing is displayed
// some condition evaluation using responseDf
true
})
Я что-то не так? Если да, то каков был бы лучший подход для загрузки внешних данных в память и запроса на этапе трансформации потока.