0

Я пытаюсь сделать запрос,Как разобрать данные из 2 x потоков Kinesis в 1 x Spark Streaming App?

select a.user_id , b.domain from realTimeTable_1 as a join realTimeTable_2 as b on a.device_id = b.device_id

с использованием двух кинезисов Потоков. Однако выход из Stream2 отсутствует, кто-нибудь знает, как объединить или записать одновременно два потока данных в hbase или паркет? Вот мой код, я установить SparkConf().set("spark.streaming.concurrentJobs", "2") обрабатывать оба потока:

val numShards_s1 = kinesisClient.describeStream("stream1").getStreamDescription().getShards().size 
val numShards_s2 = kinesisClient.describeStream("stream2").getStreamDescription().getShards().size 
val numStreams_s1 = numShards_s1 
val numStreams_s2 = numShards_s2 
val batchInterval = Seconds(5) 
val kinesisClient = new AmazonKinesisClient(credentials)kinesisClient.setEndpoint("https://kinesis.us-east-1.amazonaws.com") 
val kinesisCheckpointInterval = batchInterva 
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() 
val ssc = new StreamingContext(sc, batchInterval) 
val kinesisStreams_s1 = (0 until numStreams_s1).map { i => 
    KinesisUtils.createStream(ssc, "stream-demo", "stream1", endpointUrl, regionName,InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) 
} 
    val kinesisStreams_s2 = (0 until numShards_s2).map { i => 
    KinesisUtils.createStream(ssc, "stream-demo", "stream2", endpointUrl, regionName,InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) 
} 
val unionStreams_s1 = ssc.union(kinesisStreams_s1) 
val unionStreams_s2 = ssc.union(kinesisStreams_s2) 
val schemaString_s1 = "user_id,device_id,action,timestamp 
val schemaString_s2= "device_id,domain,timestamp 
val tableSchema_s1 = StructType(schemaString_s1.split(",").map(fieldName => StructField(fieldName, StringType, true))) 
val tableSchema_s2 = StructType(schemaString_s2.split(",").map(fieldName => StructField(fieldName, StringType, true))) 

unionStreams_s1.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { 
    val rowRDD = rdd.map(w => Row.fromSeq(new String(w).split(","))) 
    val output1 = sqlContext.createDataFrame(rowRDD,tableSchema_s1) 
    output1.createOrReplaceTempView("realTimeTable_1")}) 

unionStreams_s2.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { 
    val rowRDD = rdd.map(w => Row.fromSeq(new String(w).split(","))) 
    val output2 = sqlContext.createDataFrame(rowRDD,tableSchema_s2) 
    output1.createOrReplaceTempView("realTimeTable_2")}) 

так их в теории я должен быть в состоянии выполнить:

select a.user_id , b.domain from realTimeTable_1 as a join realTimeTable_2 as b on a.device_id = b.device_id 

однако даже делать select * from realTimeTable_2 не производит какой-либо вывод, я думаю, в моем коде отсутствует что-то, может ли кто-нибудь обнаружить недостающую логику, пожалуйста?

+0

Я не могу использовать два контекста в JVM в соответствии с искровым потоковом документации либо: Только один StreamingContext может быть активным в JVM, в то же время. –

ответ

0

В Splice Machine мы никогда не пробовали двойные потоки только с одним потоком, а затем соединялись с постоянными данными через SQL.

Я не вижу начало потока? Вот код, который кажется очень похожим на ваш, я надеюсь, что это поможет.

Проверьте KinesisWordCountASL.scala на главной ветке искры.

Вот ссылка на короткий срок.

https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

+0

Спасибо, Джон, вы разобрали две разные схемы ввода? Я видел прилагаемый код, к сожалению, он рассматривает два потока как отдельные осколки. Моя проблема заключается в том, как включить платформу комплексной обработки событий, чтобы одновременно анализировать два живых набора данных с 2 x различными schas emas. –

+0

начал работать как ssc.start() –