Я пытаюсь подключить ApacheSpark Structured Stream к теме MQTT (платформа IBM Watson IoT на IBM Bluemix в этом случае).Схема, потерянная с ApacheBahir Stuctured Streaming коннектор на потоке ApacheSpark
Я создаю структурированный поток следующим образом:
val df = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","a-vy0z2s-q6s8r693hv")
.option("password","B+UX(aWuFPvX")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")
До сих пор так хорошо, в РЕПЛ я вернусь этот объект ф.р. следующим образом:
ДФ: org.apache.spark .sql.DataFrame = [значение: строка, метка времени: метка времени]
я узнал из this thread, что я должен изменить идентификатор клиента когда-либо y время, котор я соединяю. Так что это решение, но если я начну читать из потока, используя эту строку:
val query = df.writeStream. outputMode ("добавить"). .
формат ("консоль") старт()
Тогда в результате схема выглядит следующим образом:
ДФ: org.apache.spark.sql.DataFrame = [значение: строка, метка времени : метка времени]
И данные следующим образом:
Это означает, что поток JSON преобразуется в поток строкового объекта, содержащий представление JSON.
Является ли это ограничением ApacheBahir?
также предоставление схемы не помогает, так как следующий код напоминает в тот же результат:
import org.apache.spark.sql.types._
val schema = StructType(
StructField("count",LongType,true)::
StructField("flowrate",LongType,true)::
StructField("fluidlevel",StringType,true)::
StructField("frequency",LongType,true)::
StructField("hardness",LongType,true)::
StructField("speed",LongType,true)::
StructField("temperature",LongType,true)::
StructField("ts",LongType,true)::
StructField("voltage",LongType,true)::
Nil)
:paste
val df = spark.readStream
.schema(schema)
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","a-vy0z2s-q6s8r693hv")
.option("password","B+UX(a8GFPvX")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf4")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")