Как обсуждалось, я бы оценил сохранение вашей стороны запроса в базе данных.
Если это не вариант, и вы хотите придерживаться с одного запросом инерционности на осколок сделать следующее в вашем актере запроса:
var inRecovery: Boolean = true;
override def preStart() = {
//Subscribe to your event live stream now, so you don't miss anything during recovery
// e.g. send Subscription message to your persistence query actor
//Re-Read everything up to now for recovery
readJournal.currentEventsByPersistenceId("persistenceId")
.watchTermination()((_, f) => f pipeTo self) // Send Done to self after recovery is finished
.map(Replay.apply) // Mark your replay messages
.runWith(Sink.actorRef(self, tag)) // Send all replay events to self
}
override def receive = {
case Done => // Recovery is finished
inRecovery = false
unstashAll() // unstash all normal messages received during recovery
case Replay(payload) =>
//handle replayed messages
case events: Event =>
//handle normal events from your persistence query
inRecovery match {
case true => stash() // stash normal messages until recovery is done
case false =>
// recovery is done, start handling normal events
}
}
case class Replay(payload: AnyRef)
Так в основном до того, как актер начинает подписаться на актер сохранения запроса и восстановить состояние с конечным потоком всех прошлых событий, который заканчивается после того, как все события прошли. Во время восстановления сохраняются все входящие события, которые не воспроизводятся. Затем, после того как восстановление будет завершено, расстегните все и начните обработку обычных сообщений.
Сохраняете ли вы состояние участников запроса только в памяти? Для моей стороны запроса я использую Persistence Queries для обновления вида базы данных. – thwiegan
Да, я только сохраняю состояние в памяти актера. – Reeebuuk
Являются ли ваши взгляды потребляющими только один постоянный идентификатор или больше? – thwiegan