2016-12-19 1 views
2

Я пытаюсь передать данные из темы kafka с помощью приложения scala. Я могу получить данные из этой темы, но как создать из нее кадр данных?Неправильный вывод при создании DataFrame

Вот данные (в строке, формат строки)

{ 
    "action": "AppEvent", 
    "tenantid": 298, 
    "lat": 0.0, 
    "lon": 0.0, 
    "memberid": 16390, 
    "event_name": "CATEGORY_CLICK", 
    "productUpccd": 0, 
    "device_type": "iPhone", 
    "device_os_ver": "10.1", 
    "item_name": "CHICKEN" 
} 

Я попробовал несколько способов сделать это, но это не приносит удовлетворительных результатов.

+--------------------+ |     _1| 
+--------------------+ |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| 
|{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| 
|{"action":"AppEve...| |{"action":"AppEve...| 

Может ли кто-нибудь сказать Как сделать сопоставление, чтобы каждое поле входило в отдельный столбец, подобный таблице. данные в формате avro.

вот код, который получает данные из этой темы.

val ssc = new StreamingContext(sc, Seconds(2)) 
val kafkaConf = Map[String, String]("metadata.broker.list" -> "####", 
    "zookeeper.connect" -> "########", 
    "group.id" -> "KafkaConsumer", 
    "zookeeper.connection.timeout.ms" -> "1000000") 
val topicMaps = Map("fishbowl" -> 1) 
val messages = KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER).map(_._2) 

, пожалуйста, руководство меня, как использовать foreachRDD FUNC и карту(), чтобы создать правильный кадр данных

+0

Пробовали ли вы сделать поиск? [spark-streaming + dataframe] (http://stackoverflow.com/search?q=%5Bspark-streaming%5D+dataframe) – maasg

+0

Это не полезно, так как im новое для scala.I не мог понять, как конвертировать avro [String, String] в dataframe http://stackoverflow.com/questions/41237929/value-toint-is-not-a-member-of-object –

+0

Вот мой ответ Спасибо Maasg нашел ответ –

ответ

2

Чтобы создать dataframe из РДА независимо от случая классовой схемы. Используйте ниже логик

stream.foreachRDD(
    rdd => { 
    val dataFrame = sqlContext.read.json(rdd.map(_._2)) 
dataFrame.show() 
     }) 

Здесь поток РД создан из kafkaUtils.createStream()

+0

Молодцы. Что касается комментария «независимо от его формата или схемы класса case», это не совсем правильно => Это работает только для записей в формате JSON. – maasg

+0

@maasg thanks sir, отредактировал мой комментарий. Поскольку я работал с avro (все еще его схема находится в json) –

 Смежные вопросы

  • Нет связанных вопросов^_^