2017-02-11 17 views
0

Теперь я пытаюсь соединиться от MQTT сообщений Кафок (на самом деле искра потокового в Кафке)Как CONVER полезных данных MQTT к Кафке строкового типа

Я использовал этот Коннектер https://github.com/evokly/kafka-connect-mqtt

И Спарк-2.1. 0, Кафка - 0.10.1.1

Saprk потокового вывода, такие как этот

({"schema":{"type":"string","optional":false},"payload":"mqtt"},{"schema":{"type":"bytes","optional":false},"payload":"MTIzMTIz"}) 

и продюсер кода

object mqttProducer { 
def main(args: Array[String]) { 
val brokerUrl = "tcp://ip" 
val topic = "mqtt" 
val msg = "123123" 

var client: MqttClient = null 

// Creating new persistence for mqtt client 
val persistence = new MqttDefaultFilePersistence("/tmp") 

try { 
    // mqtt client with specific url and client id 
    client = new MqttClient(brokerUrl, MqttClient.generateClientId, persistence) 

    client.connect() 

    val msgTopic = client.getTopic(topic) 
    val message = new MqttMessage(msg.getBytes("utf-8")) 

    while (true) { 
    msgTopic.publish(message) 
    println("Publishing Data, Topic : %s, Message : %s".format(msgTopic.getName, message)) 
    Thread.sleep(1000) 
    } 
} 

catch { 
    case e: MqttException => println("Exception Caught: " + e) 
} 

finally { 
    client.disconnect() 
} 

и искровым потоковое Кафка потребитель код

package hb.test1 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.streaming.kafka010.KafkaUtils 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 

object test2 { 

    def main(args: Array[String]): Unit = { 

val sparkConf = new SparkConf().setAppName("app") 
val ssc = new StreamingContext(sparkConf, Seconds(1))  


val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> 
    "servers ip", 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[StringDeserializer], 
    "group.id" -> "use_a_separate_group_id_for_each_stream", 
    "auto.offset.reset" -> "latest", 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
) 
val topics = Array("mqtt-kafka") 
    val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 

val testStream = stream.map(x => (x.key, x.value)) 


testStream.print() 

ssc.start() 
ssc.awaitTermination() 
    } 
} 

Как я мог получить строки не байт? помогите ребятам

ответ

1

Эта полезная нагрузка «MTIzMTIz» - это строка «123123», закодированная только base64. Если вы хотите просто взять полезную нагрузку MQTT и отправить ее в Kafka без кодировки base64, вы должны использовать ByteArrayConverter. В моей конфигурации для того же разъема MQTT я установить преобразователь значений, как так:

«value.converter»: «io.confluent.connect.replicator.util.ByteArrayConverter»

выше ByteArrayConverter поставляется с вырожденным Распространение предприятия, но есть и другие open-source Kafka Connect ByteArrayConverters, такие как тот, который включен в соединитель kbka-connect-s3 qubole/streamx.

https://github.com/qubole/streamx/blob/8b9d43008d9901b8e6020404f137944ed97522e2/src/main/java/com/qubole/streamx/ByteArrayConverter.java

Существует KIP-128, чтобы добавить стандартный ByteArrayConverter в рамках Кафка Connect

https://cwiki.apache.org/confluence/display/KAFKA/KIP-128%3A+Add+ByteArrayConverter+for+Kafka+Connect

UPDATE: Кафка 0,11 теперь выпущен и корабли с ByteArrayConverter. Настройте "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter" и вы получите необработанную полезную нагрузку mqtt без кодировки Base64.

 Смежные вопросы

  • Нет связанных вопросов^_^