2017-02-15 15 views
1

Я успешно интегрировал код, чтобы вытаскивать сообщения с концентратора событий и обрабатывать их с помощью искровой/искровой потоковой передачи. Теперь я перехожу к управляющему состоянию по мере прохождения сообщений. Это код, который я использую, который по большей части является адаптацией https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.htmlSpark Streaming и Azure Event Hubs mapWithState

По существу это работает с фиктивным источником, он работает с одним потоком на одном разделе, но он не работает для объединенного поток окна. Хотя я мог создавать несколько экземпляров потока по одному для каждого раздела, он как бы побеждает точку объединения и окна .. + мои попытки заставить его работать таким образом не удались. Я своего рода застрял для вдохновения о том, куда идти .. если у кого есть какие-либо идеи, которые были бы великий ..

val sparkSession = SparkSession.builder().master("local[2]").config(sparkConfiguration).getOrCreate() 

val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(10)) 
streamingContext.checkpoint(inputOptions.checkpointDir) 

//derive the stream and window 
val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters) 
val eventHubsWindowedStream = eventHubsStream.window(Seconds(10)) 

val initialRDD = sparkSession.sparkContext.parallelize(List(("dummy", 100L), ("source", 32L))) 
val stateSpec = StateSpec.function(trackStateFunc _) 
    .initialState(initialRDD) 
    .numPartitions(2) 
    .timeout(Seconds(60)) 

val eventStream = eventHubsWindowedStream 
    .map(messageStr => { 
    //parse teh event 
    var event = gson.fromJson(new String(messageStr), classOf[Event]) 

    //return a tuble of key/value pair 
    (event.product_id.toString, 1) 
    }) 

val eventStateStream = eventStream.mapWithState(stateSpec) 

val stateSnapshotStream = eventStateStream.stateSnapshots() 
stateSnapshotStream.print() 

stateSnapshotStream.foreachRDD { rdd => 
    import sparkSession.implicits._ 
    rdd.toDF("word", "count").registerTempTable("batch_word_count") 
} 

streamingContext.remember(Minutes(1)) 

streamingContext 
+0

* он не работает для потока окон в профсоюзе. * Что не работает? –

+0

Извинения, по сути, функция состояния никогда не называется. я не могу отлаживать эту точку. Когда я использую образец кода, это прекрасно .. и когда я использую один поток, это прекрасно .. но не тогда, когда я использую объединенный поток или окно. –

+0

Вы пытались отлаживать локально в своей среде IDE? –

ответ

0

я решил мою проблему, в том, что я в конечном итоге с использованием прямого потока и все мои проблемы Ушли. Я избегал этого, поскольку каталог прогресса поддерживает только HDFS или ADL, и теперь я больше не могу тестировать локально.

EventHubsUtils.createDirectStreams (StreamingContext, inputOptions.namespace, inputOptions.hdfs, карта (inputOptions.eventhub -> GetEventHubParams (inputOptions)))

Тем не менее, поток объединение не работает .. Теперь я просто нужно выяснить, как удалить каталог прогресса в HDFS!