У меня на самом деле заканчиваются варианты. В моем искрообразующем приложении. Я хочу сохранить состояние на некоторых клавишах. Я получаю события от Кафки. Затем я извлекаю ключи из события, например userID. Когда из Kafka не происходит событий, я хочу обновлять счетчик по отношению к каждому идентификатору пользователя каждые 3 секунды, так как я сконфигурировал загрузку моего StreamingContext с 3 секундами.Воспроизведение RDD в искровом потоке для обновления аккумулятора
Теперь, как я делаю это может быть уродливым, но, по крайней мере, это работает: у меня есть accumulableCollection вроде этого:
val userID = ssc.sparkContext.accumulableCollection(new mutable.HashMap[String,Long]())
Затем я создаю «фальшивый» событие и толкать его к моей искры потоковый контекст как:
val rddQueue = new mutable.SynchronizedQueue[RDD[String]]()
for (i <- 1 to 100) {
rddQueue += ssc.sparkContext.makeRDD(Seq("FAKE_MESSAGE"))
Thread.sleep(3000)
}
val inputStream = ssc.queueStream(rddQueue)
inputStream.foreachRDD(UPDATE_MY_ACCUMULATOR)
Это позволило бы мне доступ к моему accumulatorCollection и обновить все счетчики всех UserIds. До сих пор все работает отлично, но когда я изменить свой цикл от:
for (i <- 1 to 100) {} #This is for test
To:
while (true) {} #This is to let me access and update my accumulator through the whole application life cycle
Затем, когда я бегу мой ./spark-submit, мое приложение застревает на этом этапе:
15/12/10 18:09:00 INFO BlockManagerMasterActor: Registering block manager slave1.cluster.example:38959 with 1060.3 MB RAM, BlockManagerId(1, slave1.cluster.example, 38959)
Любая подсказка о том, как разрешить это? Есть ли довольно простой способ, который позволил бы мне обновлять значения моих идентификаторов пользователей (вместо того, чтобы создавать неиспользуемые RDD и периодически его перенаправлять на queuestream)?