2016-07-04 5 views
1

Я занимаюсь поиском сторон приложения аккомпанемента CQRS Akka.Akka Persistence Query and actor sharding

Актеры запросов настраиваются как кластерный осколок и заполняются событиями из одного потока запросов на сохранение.

Мои вопросы:

  1. Если один из участников в кластере осколок перезагружать, как его восстановить?

    • Завершить весь кластерный осколок и ответить на все события?
    • Сделайте актеров в кластерном осколке постоянных актеров и сохраните новый набор событий только для стороны запроса?
  2. Если актер, который является наполнителем с запросом на сохранение, перезапускается, как я могу отменить текущий PQ и запустить его снова?

+1

Сохраняете ли вы состояние участников запроса только в памяти? Для моей стороны запроса я использую Persistence Queries для обновления вида базы данных. – thwiegan

+0

Да, я только сохраняю состояние в памяти актера. – Reeebuuk

+0

Являются ли ваши взгляды потребляющими только один постоянный идентификатор или больше? – thwiegan

ответ

2

Как обсуждалось, я бы оценил сохранение вашей стороны запроса в базе данных.

Если это не вариант, и вы хотите придерживаться с одного запросом инерционности на осколок сделать следующее в вашем актере запроса:

var inRecovery: Boolean = true; 

override def preStart() = { 
    //Subscribe to your event live stream now, so you don't miss anything during recovery 
    // e.g. send Subscription message to your persistence query actor 

    //Re-Read everything up to now for recovery 
    readJournal.currentEventsByPersistenceId("persistenceId") 
     .watchTermination()((_, f) => f pipeTo self) // Send Done to self after recovery is finished 
     .map(Replay.apply) // Mark your replay messages 
     .runWith(Sink.actorRef(self, tag)) // Send all replay events to self 
} 

override def receive = { 
    case Done => // Recovery is finished 
     inRecovery = false 
     unstashAll() // unstash all normal messages received during recovery 

    case Replay(payload) => 
     //handle replayed messages 

    case events: Event => 
     //handle normal events from your persistence query 
     inRecovery match { 
      case true => stash() // stash normal messages until recovery is done 
      case false => 
       // recovery is done, start handling normal events 
     } 
} 


case class Replay(payload: AnyRef) 

Так в основном до того, как актер начинает подписаться на актер сохранения запроса и восстановить состояние с конечным потоком всех прошлых событий, который заканчивается после того, как все события прошли. Во время восстановления сохраняются все входящие события, которые не воспроизводятся. Затем, после того как восстановление будет завершено, расстегните все и начните обработку обычных сообщений.