2015-10-22 5 views
1

Я пишу некоторые автономные тесты интеграции вокруг Apache Spark Streaming. Я хочу проверить, что мой код может поглощать все виды краевых случаев в моих симулированных тестовых данных. Когда я делал это с регулярными RDD (не потоковыми). Я мог бы использовать свои встроенные данные и называть их «распараллеливать», чтобы превратить его в искровой RDD. Тем не менее, я не могу найти такого метода для создания destreams. В идеале я хотел бы называть некоторую «push» функцию время от времени и напоминать волшебство в моем dstream. ATM Я делаю это с помощью Apache Kafka: я создаю временную очередь, и я пишу ей. Но это кажется излишним. Я бы скорее создал test-dstream непосредственно из своих тестовых данных, не используя Kafka в качестве посредника.Программно создавая dstreams в искры apache

ответ

3

Для целей тестирования вы можете создать поток ввода из очереди RDD. Нажатие большего числа RDD в очереди будет имитировать обработку более событий в пакетном интервале.

val sc = SparkContextHolder.sc 
val ssc = new StreamingContext(sc, Seconds(1)) 
val inputData: mutable.Queue[RDD[Int]] = mutable.Queue() 
val inputStream: InputDStream[Int] = ssc.queueStream(inputData) 

inputData += sc.makeRDD(List(1, 2)) // Emulate the RDD created during the first batch interval 
inputData += sc.makeRDD(List(3, 4)) // 2nd batch interval 
// etc 

val result = inputStream.map(x => x*x) 
result.foreachRDD(rdd => assertSomething(rdd)) 
ssc.start() // Don't forget to start the streaming context 
1

В дополнение к решению Рафаэля, я думаю, вам нравится также либо обрабатывать одну партию времени, либо весь доступный подход. Вы должны установить флаг oneAtATime соответственно на аргумент необязательного метода queustream, как показано ниже:

val slideDuration = Milliseconds(100) 
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[8]") 
val sparkSession: SparkSession = SparkSession.builder.config(conf).getOrCreate() 
val sparkContext: SparkContext = sparkSession.sparkContext 
val queueOfRDDs = mutable.Queue[RDD[String]]() 


val streamingContext: StreamingContext = new StreamingContext(sparkContext, slideDuration) 
val rddOneQueuesAtATimeDS: DStream[String] = streamingContext.queueStream(queueOfRDDs, oneAtATime = true) 
val rddFloodOfQueuesDS: DStream[String] = streamingContext.queueStream(queueOfRDDs, oneAtATime = false) 

rddOneQueuesAtATimeDS.print(120) 
rddFloodOfQueuesDS.print(120) 

streamingContext.start() 


for (i <- (1 to 10)) { 
    queueOfRDDs += sparkContext.makeRDD(simplePurchase(i)) 
    queueOfRDDs += sparkContext.makeRDD(simplePurchase((i + 3) * (i + 3))) 
    Thread.sleep(slideDuration.milliseconds) 
} 

Thread.sleep(1000L) 

 Смежные вопросы

  • Нет связанных вопросов^_^