Я пытаюсь передать данные из темы kafka с помощью приложения scala. Я могу получить данные из этой темы, но как создать из нее кадр данных?Неправильный вывод при создании DataFrame
Вот данные (в строке, формат строки)
{
"action": "AppEvent",
"tenantid": 298,
"lat": 0.0,
"lon": 0.0,
"memberid": 16390,
"event_name": "CATEGORY_CLICK",
"productUpccd": 0,
"device_type": "iPhone",
"device_os_ver": "10.1",
"item_name": "CHICKEN"
}
Я попробовал несколько способов сделать это, но это не приносит удовлетворительных результатов.
+--------------------+ | _1|
+--------------------+ |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...|
|{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...|
|{"action":"AppEve...| |{"action":"AppEve...|
Может ли кто-нибудь сказать Как сделать сопоставление, чтобы каждое поле входило в отдельный столбец, подобный таблице. данные в формате avro.
вот код, который получает данные из этой темы.
val ssc = new StreamingContext(sc, Seconds(2))
val kafkaConf = Map[String, String]("metadata.broker.list" -> "####",
"zookeeper.connect" -> "########",
"group.id" -> "KafkaConsumer",
"zookeeper.connection.timeout.ms" -> "1000000")
val topicMaps = Map("fishbowl" -> 1)
val messages = KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER).map(_._2)
, пожалуйста, руководство меня, как использовать foreachRDD FUNC и карту(), чтобы создать правильный кадр данных
Пробовали ли вы сделать поиск? [spark-streaming + dataframe] (http://stackoverflow.com/search?q=%5Bspark-streaming%5D+dataframe) – maasg
Это не полезно, так как im новое для scala.I не мог понять, как конвертировать avro [String, String] в dataframe http://stackoverflow.com/questions/41237929/value-toint-is-not-a-member-of-object –
Вот мой ответ Спасибо Maasg нашел ответ –