Я работаю над DataBricks (Spark 2.0.1-db1 (Scala 2.11)), и я пытаюсь использовать функции Spark Streaming. Я использую библиотеки:
- искровой SQL-потоковый mqtt_2.11-2.1.0-SNAPSHOT.jar (см здесь: http://bahir.apache.org/docs/spark/current/spark-sql-streaming-mqtt/)Spark Streaming MQTT - Применить схему в наборе данных
Следующая команда дает мне набор данные:
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("clientId", "sparkTest")
.option("brokerUrl", "tcp://xxx.xxx.xxx.xxx:xxx")
.option("topic", "/Name/data")
.option("localStorage", "dbfs:/models/mqttPersist")
.option("cleanSession", "true")
.load().as[(String, Timestamp)]
с этим printSchema:
root
|-- value : string (nullable : true)
|-- timestamp : timestamp (nullable : true)
И я хотел бы применить схему на колонке «значение» моего набора данных. вы можете видеть мою схему json следующим образом.
root
|-- id : string (nullable = true)
|-- DateTime : timestamp (nullable = true)
|-- label : double (nullable = true)
Можно ли непосредственно разобрать мой JSON в потоке, чтобы получить что-то вроде этого:
root
|-- value : struct (nullable : true)
|-- id : string (nullable = true)
|-- DateTime : timestamp (nullable = true)
|-- label : double (nullable = true)
|-- timestamp : timestamp (nullable : true)
На данный момент я не вижу какой-либо способ разбора JSON из MQTT и любой помощь будет очень велика.
Заранее спасибо.