2015-12-10 7 views
2

У меня на самом деле заканчиваются варианты. В моем искрообразующем приложении. Я хочу сохранить состояние на некоторых клавишах. Я получаю события от Кафки. Затем я извлекаю ключи из события, например userID. Когда из Kafka не происходит событий, я хочу обновлять счетчик по отношению к каждому идентификатору пользователя каждые 3 секунды, так как я сконфигурировал загрузку моего StreamingContext с 3 секундами.Воспроизведение RDD в искровом потоке для обновления аккумулятора

Теперь, как я делаю это может быть уродливым, но, по крайней мере, это работает: у меня есть accumulableCollection вроде этого:

val userID = ssc.sparkContext.accumulableCollection(new mutable.HashMap[String,Long]()) 

Затем я создаю «фальшивый» событие и толкать его к моей искры потоковый контекст как:

val rddQueue = new mutable.SynchronizedQueue[RDD[String]]() 
for (i <- 1 to 100) { 
    rddQueue += ssc.sparkContext.makeRDD(Seq("FAKE_MESSAGE")) 
    Thread.sleep(3000) 
} 
val inputStream = ssc.queueStream(rddQueue) 

inputStream.foreachRDD(UPDATE_MY_ACCUMULATOR) 

Это позволило бы мне доступ к моему accumulatorCollection и обновить все счетчики всех UserIds. До сих пор все работает отлично, но когда я изменить свой цикл от:

for (i <- 1 to 100) {} #This is for test 

To:

while (true) {} #This is to let me access and update my accumulator through the whole application life cycle 

Затем, когда я бегу мой ./spark-submit, мое приложение застревает на этом этапе:

15/12/10 18:09:00 INFO BlockManagerMasterActor: Registering block manager slave1.cluster.example:38959 with 1060.3 MB RAM, BlockManagerId(1, slave1.cluster.example, 38959) 

Любая подсказка о том, как разрешить это? Есть ли довольно простой способ, который позволил бы мне обновлять значения моих идентификаторов пользователей (вместо того, чтобы создавать неиспользуемые RDD и периодически его перенаправлять на queuestream)?

ответ

3

Причина, по которой версия while (true) ... не работает, заключается в том, что элемент управления никогда не возвращается к основной строке выполнения, и поэтому ничего ниже этой строки не выполняется. Чтобы решить эту конкретную проблему, мы должны выполнить цикл while в отдельном потоке. Future { while() ...} должно работать. Кроме того, Thread.sleep(3000) при заполнении QueueDStream в приведенном выше примере не требуется. Spark Streaming будет потреблять одно сообщение из очереди на каждом интервале потоковой передачи.

Лучший способ инициировать этот приток сообщений «галочки» будет с ConstantInputDStream, который воспроизводит тот же RDD на каждом интервале потоковой передачи, поэтому устранение необходимости создания притока RDD с помощью QueueDStream.

Тем не менее, мне кажется, что нынешний подход кажется хрупким и нуждается в пересмотре.

 Смежные вопросы

  • Нет связанных вопросов^_^