Я пытаюсь сделать запрос,Как разобрать данные из 2 x потоков Kinesis в 1 x Spark Streaming App?
select a.user_id , b.domain from realTimeTable_1 as a join realTimeTable_2 as b on a.device_id = b.device_id
с использованием двух кинезисов Потоков. Однако выход из Stream2 отсутствует, кто-нибудь знает, как объединить или записать одновременно два потока данных в hbase или паркет? Вот мой код, я установить SparkConf().set("spark.streaming.concurrentJobs", "2")
обрабатывать оба потока:
val numShards_s1 = kinesisClient.describeStream("stream1").getStreamDescription().getShards().size
val numShards_s2 = kinesisClient.describeStream("stream2").getStreamDescription().getShards().size
val numStreams_s1 = numShards_s1
val numStreams_s2 = numShards_s2
val batchInterval = Seconds(5)
val kinesisClient = new AmazonKinesisClient(credentials)kinesisClient.setEndpoint("https://kinesis.us-east-1.amazonaws.com")
val kinesisCheckpointInterval = batchInterva
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
val ssc = new StreamingContext(sc, batchInterval)
val kinesisStreams_s1 = (0 until numStreams_s1).map { i =>
KinesisUtils.createStream(ssc, "stream-demo", "stream1", endpointUrl, regionName,InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
}
val kinesisStreams_s2 = (0 until numShards_s2).map { i =>
KinesisUtils.createStream(ssc, "stream-demo", "stream2", endpointUrl, regionName,InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
}
val unionStreams_s1 = ssc.union(kinesisStreams_s1)
val unionStreams_s2 = ssc.union(kinesisStreams_s2)
val schemaString_s1 = "user_id,device_id,action,timestamp
val schemaString_s2= "device_id,domain,timestamp
val tableSchema_s1 = StructType(schemaString_s1.split(",").map(fieldName => StructField(fieldName, StringType, true)))
val tableSchema_s2 = StructType(schemaString_s2.split(",").map(fieldName => StructField(fieldName, StringType, true)))
unionStreams_s1.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => {
val rowRDD = rdd.map(w => Row.fromSeq(new String(w).split(",")))
val output1 = sqlContext.createDataFrame(rowRDD,tableSchema_s1)
output1.createOrReplaceTempView("realTimeTable_1")})
unionStreams_s2.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => {
val rowRDD = rdd.map(w => Row.fromSeq(new String(w).split(",")))
val output2 = sqlContext.createDataFrame(rowRDD,tableSchema_s2)
output1.createOrReplaceTempView("realTimeTable_2")})
так их в теории я должен быть в состоянии выполнить:
select a.user_id , b.domain from realTimeTable_1 as a join realTimeTable_2 as b on a.device_id = b.device_id
однако даже делать select * from realTimeTable_2
не производит какой-либо вывод, я думаю, в моем коде отсутствует что-то, может ли кто-нибудь обнаружить недостающую логику, пожалуйста?
Я не могу использовать два контекста в JVM в соответствии с искровым потоковом документации либо: Только один StreamingContext может быть активным в JVM, в то же время. –