2016-10-19 3 views
1

Я работаю над DataBricks (Spark 2.0.1-db1 (Scala 2.11)), и я пытаюсь использовать функции Spark Streaming. Я использую библиотеки:
- искровой SQL-потоковый mqtt_2.11-2.1.0-SNAPSHOT.jar (см здесь: http://bahir.apache.org/docs/spark/current/spark-sql-streaming-mqtt/)Spark Streaming MQTT - Применить схему в наборе данных

Следующая команда дает мне набор данные:

val lines = spark.readStream 
     .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") 
     .option("clientId", "sparkTest") 
     .option("brokerUrl", "tcp://xxx.xxx.xxx.xxx:xxx") 
     .option("topic", "/Name/data") 
     .option("localStorage", "dbfs:/models/mqttPersist") 
     .option("cleanSession", "true") 
     .load().as[(String, Timestamp)] 

с этим printSchema:

root 
|-- value : string (nullable : true) 
|-- timestamp : timestamp (nullable : true) 

И я хотел бы применить схему на колонке «значение» моего набора данных. вы можете видеть мою схему json следующим образом.

root 
|-- id : string (nullable = true) 
|-- DateTime : timestamp (nullable = true) 
|-- label : double (nullable = true) 

Можно ли непосредственно разобрать мой JSON в потоке, чтобы получить что-то вроде этого:

root 
|-- value : struct (nullable : true) 
    |-- id : string (nullable = true) 
    |-- DateTime : timestamp (nullable = true) 
    |-- label : double (nullable = true) 
|-- timestamp : timestamp (nullable : true) 

На данный момент я не вижу какой-либо способ разбора JSON из MQTT и любой помощь будет очень велика.

Заранее спасибо.

ответ

0

У меня была такая же точная проблема сегодня! Я использовал json4s и Jackson для разбора json.

Как я получаю потоковый DataSet (в значительной степени то же самое, что у вас есть):

val lines = spark.readStream 
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") 
    .option("topic", topic) 
    .option("brokerUrl",brokerUrl) 
    .load().as[(String,Timestamp)] 

Я определил схему, используя тематический класс:

case class DeviceData(devicename: String, time: Long, metric: String, value: Long, unit: String) 

PARSING из колонки в формате JSON используя org.json4s.jackson.JsonMethods.parse:

val ds = lines.map { 
    row => 
    implicit val format = DefaultFormats 
    parse(row._1).extract[DeviceData] 
} 

Вывод результатов:

val query = ds.writeStream 
    .format("console") 
    .option("truncate", false) 
    .start() 

Результаты:

+----------+-------------+-----------+-----+----+ 
|devicename|time   |metric  |value|unit| 
+----------+-------------+-----------+-----+----+ 
|dht11_4 |1486656575772|temperature|9 |C | 
|dht11_4 |1486656575772|humidity |36 |% | 
+----------+-------------+-----------+-----+----+ 

Я отчасти разочарован, я не могу придумать решение, которое использует Sparks родной JSon разборе. Вместо этого мы должны полагаться на Джексона. Вы можете использовать собственный синтаксический анализ json, если вы читаете файл как поток. Таким образом:

val lines = spark.readStream 
    ..... 
    .json("./path/to/file").as[(String,Timestamp)] 

Но для MQTT мы не можем этого сделать.