Я пишу некоторые автономные тесты интеграции вокруг Apache Spark Streaming. Я хочу проверить, что мой код может поглощать все виды краевых случаев в моих симулированных тестовых данных. Когда я делал это с регулярными RDD (не потоковыми). Я мог бы использовать свои встроенные данные и называть их «распараллеливать», чтобы превратить его в искровой RDD. Тем не менее, я не могу найти такого метода для создания destreams. В идеале я хотел бы называть некоторую «push» функцию время от времени и напоминать волшебство в моем dstream. ATM Я делаю это с помощью Apache Kafka: я создаю временную очередь, и я пишу ей. Но это кажется излишним. Я бы скорее создал test-dstream непосредственно из своих тестовых данных, не используя Kafka в качестве посредника.Программно создавая dstreams в искры apache
1
A
ответ
0
Я нашел этот базовый пример: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
Ключевым моментом здесь является вызовом команды «магазин». Замените содержимое магазина тем, что вы хотите.
3
Для целей тестирования вы можете создать поток ввода из очереди RDD. Нажатие большего числа RDD в очереди будет имитировать обработку более событий в пакетном интервале.
val sc = SparkContextHolder.sc
val ssc = new StreamingContext(sc, Seconds(1))
val inputData: mutable.Queue[RDD[Int]] = mutable.Queue()
val inputStream: InputDStream[Int] = ssc.queueStream(inputData)
inputData += sc.makeRDD(List(1, 2)) // Emulate the RDD created during the first batch interval
inputData += sc.makeRDD(List(3, 4)) // 2nd batch interval
// etc
val result = inputStream.map(x => x*x)
result.foreachRDD(rdd => assertSomething(rdd))
ssc.start() // Don't forget to start the streaming context
1
В дополнение к решению Рафаэля, я думаю, вам нравится также либо обрабатывать одну партию времени, либо весь доступный подход. Вы должны установить флаг oneAtATime соответственно на аргумент необязательного метода queustream, как показано ниже:
val slideDuration = Milliseconds(100)
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[8]")
val sparkSession: SparkSession = SparkSession.builder.config(conf).getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val queueOfRDDs = mutable.Queue[RDD[String]]()
val streamingContext: StreamingContext = new StreamingContext(sparkContext, slideDuration)
val rddOneQueuesAtATimeDS: DStream[String] = streamingContext.queueStream(queueOfRDDs, oneAtATime = true)
val rddFloodOfQueuesDS: DStream[String] = streamingContext.queueStream(queueOfRDDs, oneAtATime = false)
rddOneQueuesAtATimeDS.print(120)
rddFloodOfQueuesDS.print(120)
streamingContext.start()
for (i <- (1 to 10)) {
queueOfRDDs += sparkContext.makeRDD(simplePurchase(i))
queueOfRDDs += sparkContext.makeRDD(simplePurchase((i + 3) * (i + 3)))
Thread.sleep(slideDuration.milliseconds)
}
Thread.sleep(1000L)