2017-02-06 7 views
0

Я пытаюсь подключить ApacheSpark Structured Stream к теме MQTT (платформа IBM Watson IoT на IBM Bluemix в этом случае).Схема, потерянная с ApacheBahir Stuctured Streaming коннектор на потоке ApacheSpark

Я создаю структурированный поток следующим образом:

val df = spark.readStream 
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") 
    .option("username","a-vy0z2s-q6s8r693hv") 
    .option("password","B+UX(aWuFPvX") 
    .option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf") 
    .option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json") 
    .load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883") 

До сих пор так хорошо, в РЕПЛ я вернусь этот объект ф.р. следующим образом:

ДФ: org.apache.spark .sql.DataFrame = [значение: строка, метка времени: метка времени]

я узнал из this thread, что я должен изменить идентификатор клиента когда-либо y время, котор я соединяю. Так что это решение, но если я начну читать из потока, используя эту строку:

val query = df.writeStream. outputMode ("добавить"). .
формат ("консоль") старт()

Тогда в результате схема выглядит следующим образом:

ДФ: org.apache.spark.sql.DataFrame = [значение: строка, метка времени : метка времени]

И данные следующим образом:

enter image description here

Это означает, что поток JSON преобразуется в поток строкового объекта, содержащий представление JSON.

Является ли это ограничением ApacheBahir?

также предоставление схемы не помогает, так как следующий код напоминает в тот же результат:

import org.apache.spark.sql.types._ 
val schema = StructType(
    StructField("count",LongType,true):: 
    StructField("flowrate",LongType,true):: 
    StructField("fluidlevel",StringType,true):: 
    StructField("frequency",LongType,true):: 
    StructField("hardness",LongType,true):: 
    StructField("speed",LongType,true):: 
    StructField("temperature",LongType,true):: 
    StructField("ts",LongType,true):: 
    StructField("voltage",LongType,true):: 
Nil) 

:paste 
val df = spark.readStream 
    .schema(schema) 
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") 
    .option("username","a-vy0z2s-q6s8r693hv") 
    .option("password","B+UX(a8GFPvX") 
    .option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf4") 
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json") 
    .load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883") 

ответ

1

DataSources Многие, в том числе, but not limited toMQTTStreamSource, имеют фиксированные схемы, которые состоят из сообщения и отметку времени. Схема не потеряна, просто не анализируется, и это ожидаемое поведение.

Если схема фиксирована и известна фронт, вы должны быть в состоянии использовать from_json функцию:

import org.apache.spark.sql.functions.from_json 

df.withColumn("value", from_json($"value", schema)) 
0

Для разбора (как я не четырехдневных «from_json» метод больше) Я использовал

импорт org.apache.spark.sql.functions.json_tuple

и следующий код, он работает, а также:

df.withColumn ("значение", json_tuple ($ "значение", "myColumnName"))