2016-06-03 11 views
1

Я следую this tutorial вот мой кодpostRestart и preRestart методы не получают пробуждают в AKKA actots

case class ArtGroupDeleteFromES (uuidList:List[String]) 
class ArtGroupDeleteESActor extends Actor{ 
val log = LoggerFactory.getLogger(this.getClass) 
    override def preStart() { 
     log.debug("preStart Starting ArtGroupDeleteESActor instance hashcode # {}", 
     this.hashCode()) 
     } 

    override def postStop() { 
     log.debug("postStop Stopping ArtGroupDeleteESActor instance hashcode # {}", 
     this.hashCode()) 
     } 

    override def preRestart(reason: Throwable, message: Option[Any]) { 
     log.debug("I am restarting") 
     log.debug("ArtGroupDeleteESActor: preRestart") 
     log.debug(s" MESSAGE: ${message.getOrElse("")}") 
     log.debug(s" REASON: ${reason.getMessage}") 
     super.preRestart(reason, message) 
     } 

    override def postRestart(reason: Throwable) { 
     log.debug("restart completed!") 
     log.debug("ArtGroupDeleteESActor: postRestart") 
     log.debug(s" REASON: ${reason.getMessage}") 
     super.postRestart(reason) 
     } 
def receive = { 
    case ArtGroupDeleteFromES(uuidList) => 
throw new Exception("Booom") 
    sender ! true 
    } 
    case message => 
    log.warn("Received unknown message: {}", message) 
    unhandled(message) 
} 

} 

и вот как я посылаю этот актер сообщение

class ArtGroupDeletionActor extends Actor{ 

    val log = LoggerFactory.getLogger(this.getClass) 
override val supervisorStrategy = OneForOneStrategy(
            maxNrOfRetries = 10, withinTimeRange = 10 seconds) { 
    case _:Exception => Restart 
    } 
val artGroupDeleteESActor=context.actorOf(Props[ArtGroupDeleteESActor] 
     .withDispatcher("akka.actor.ArtGroupDeleteESActor-dispatcher") 
     ,name = "ArtGroupDeleteESActor") 

    def receive = { 

    case DeleteArtGroup(uuidList) => 
     val future1 = ask(artGroupDeleteESActor, ArtGroupDeleteFromES(uuidList)).mapTo[Boolean] 
     var isDeletedfromES = Await.result(future1, timeout.duration) 
    case message => 
     log.warn("Unhandled message received : {}", message) 
     unhandled(message) 
    } 
} 

object test extends App{ 
val artGroupDeletionActor=system.actorOf(Props[ArtGroupDeletionActor] 
     .withDispatcher("akka.actor.ArtGroupDeletionActor-dispatcher") 
     ,name = "ArtGroupDeletionActor") 
    artGroupDeletionActor ! DeleteArtGroup(List("123")) 
} 

PostRestart() и preRestart() не вызываются, но вызываются вызовы preStart() и postStop(), пожалуйста, направьте меня, где я делаю неправильно

+0

Где вы посылаете 'ForceRestart'? Это сообщение, которое запускает перезапуск в роли актера. –

+0

здесь val future1 = ask (artGroupDeleteESActor, ArtGroupDeleteFromES (uuidList)). MapTo [Boolean] – swaheed

+0

в моем коде его ArtGroupDeleteFromES (uuidList) вместо ForceRestart, но работа такая же – swaheed

ответ

3

(для простоты позвоню актеры Parent и Child теперь)

Что здесь происходит, что, когда исключение происходит внутри Child.receive, он не посылает ответ на Parent, вместо этого, система актера отправляют некоторые инструкции управления для стратегии контроля. Однако Parent блокируется на Await ожидание завершения future1, которое происходит только после того, как timeout превышает, а затем, в свою очередь, TimeoutException выбрасывается внутрь Parent.receive, убивая (перезапуск) сам Parent актера, и, таким образом, надзорное сообщения о исключение в Child затем передается в deadLetters, никогда не перезапуская Child.

Вы не должны никогда, никогда, когда-либо блок внутри актера, так что это неправильно:

val future1 = ask(artGroupDeleteESActor, ArtGroupDeleteFromES(uuidList)).mapTo[Boolean] 
    var isDeletedfromES = Await.result(future1, timeout.duration) 

Вместо этого, вы должны либо использовать какой-то идентификации сообщения, чтобы отличить один ответ от другого одновременно окружающей среды или добавьте onComplete в будущее и отправьте сообщение в self в закрытии (будьте осторожны: никакой логики, кроме отправки сообщения, должно быть выполнено внутри закрытия в будущее!).

Таким образом, вариант А:

case class ArtGroupDeleteFromES(id: Long, uuidList: List[String]) 
case class ArtGroupDeleteFromESResult(id: Long, success: Boolean) 

class Parent extends Actor { 
    override val supervisionStrategy = ... 
    var msgId = 0L 
    var pendingRequesters = Map.empty[Long, ActorRef] 
    val child = context.actorOf(Props[Child]) 

    def nextId = { 
    msgId += 1 
    msgId 
    } 

    def receive = { 
    case DeleteArtGroup(uuidList) => 
     val id = nextId 
     pendingRequesters += id -> sender() // store a reference to the sender so that you can send it a message when everything completes 
     child ! DeleteArtGroupFromES(nextId, uuidList) 
    case ArtGroupDeleteFromESResult(id, success) => 
     // process result... 
     pendingRequesters(id) ! "done" 
     pendingRequesters -= id 
    } 
} 

А вариант Б:

case class ArtGroupDeleteFromES(uuidList: List[String]) 
case class ArtGroupDeleteFromESResult(replyTo: ActorRef, success: Boolean) 

class Parent extends Actor { 
    override val supervisionStrategy = ... 
    val child = context.actorOf(Props[Child]) 

    def receive = { 
    case DeleteArtGroup(uuidList) => 
     val requester = sender() // when the future completes, sender may have already changed, so you need to remember it 
     (child ? DeleteArtGroupFromES(uuidList)).onComplete { 
     case Success(success) => self ! ArtGroupDeleteFromESResult(requester, success) 
     case Failure(e) => 
      log.warn("Could not delete...", e) 
      self ! ArtGroupDeleteFromESResult(requester, success = false) 
    } 
}