2015-04-22 1 views
7

Я звоню актеру, используя шаблон запроса в приложении Spray и возвращаю результат в качестве ответа HTTP. Я сопоставляю сбои от актера с пользовательским кодом ошибки.Разрешение фьючерсов Akka на запрос в случае отказа

val authActor = context.actorOf(Props[AuthenticationActor]) 

callService((authActor ? TokenAuthenticationRequest(token)).mapTo[LoggedInUser]) { user => 
    complete(StatusCodes.OK, user) 
} 

def callService[T](f: => Future[T])(cb: T => RequestContext => Unit) = { 
onComplete(f) { 
    case Success(value: T) => cb(value) 
    case Failure(ex: ServiceException) => complete(ex.statusCode, ex.errorMessage) 
    case e => complete(StatusCodes.InternalServerError, "Unable to complete the request. Please try again later.") 
    //In reality this returns a custom error object. 
} 
} 

Это работает правильно, когда authActor посылает отказ, но если authActor выбрасывает исключение, ничего не происходит до тех пор, задать тайм-аут не завершится. Например:

override def receive: Receive = { 
    case _ => throw new ServiceException(ErrorCodes.AuthenticationFailed, "No valid session was found for that token") 
} 

Я знаю, что документы Akka говорят, что

Чтобы завершить будущее с исключением вам нужно послать сообщение об ошибке отправителю. Это не делается автоматически, когда актер выдает исключение при обработке сообщения.

Но, учитывая, что я использую много интерфейсов между участниками маршрутизации спрей и участниками службы, я бы предпочел не обертывать принимающую часть каждого дочернего актера с помощью try/catch. Есть ли лучший способ добиться автоматической обработки исключений в дочерних актерах и немедленно решить будущее в случае исключения?

Редактировать: это мое текущее решение. Однако, это довольно грязно, чтобы сделать это для каждого дочернего актера.

override def receive: Receive = { 
case default => 
    try { 
    default match { 
     case _ => throw new ServiceException("")//Actual code would go here 
    } 
    } 
    catch { 
    case se: ServiceException => 
     logger.error("Service error raised:", se) 
     sender ! Failure(se) 
    case ex: Exception => 
     sender ! Failure(ex) 
     throw ex 
    } 
} 

Таким образом, если это ожидаемая ошибка (т. Е. ServiceException), она обрабатывается путем создания сбоя. Если это неожиданно, он немедленно возвращает сбой, так что будущее будет разрешено, но затем выдает исключение, поэтому он все равно может обрабатываться SupervisorStrategy.

+0

Ну ... отправьте сообщение об ошибке, прежде чем выбросить исключение. –

+0

Это то, что я не хотел делать, - сказал я. ** Я бы предпочел не обманывать получающую часть каждого дочернего актера с помощью try/catch **. Это игрушечный пример, вполне возможно, что я не контролирую, где генерируется исключение. –

+0

Ну ... вы знаете ... одна из фундаментальных дорог устойчивых распределенных систем - «делать ошибки явными». Подумайте о различных ошибках, которые могут произойти ... сделайте их явными. Если у вас могут быть ошибки «TypeSafe» ... еще лучше. –

ответ

13

Если вы хотите способ обеспечить автоматическую отправку ответа отправителю в случае неожиданного исключения, то что-то подобное может работать для вас:

trait FailurePropatingActor extends Actor{ 
    override def preRestart(reason:Throwable, message:Option[Any]){ 
    super.preRestart(reason, message) 
    sender() ! Status.Failure(reason) 
    } 
} 

Мы переопределять preRestart и распространять провал назад к отправителю как Status.Failure, что приведет к сбою восходящего потока Future. Кроме того, здесь важно позвонить super.preRestart, так как здесь происходит остановка ребенка. Используя это в качестве актера выглядит примерно так:

case class GetElement(list:List[Int], index:Int) 
class MySimpleActor extends FailurePropatingActor { 
    def receive = { 
    case GetElement(list, i) => 
     val result = list(i) 
     sender() ! result 
    } 
} 

Если бы я должен был назвать экземпляр этого актера, как так:

import akka.pattern.ask 
import concurrent.duration._ 

val system = ActorSystem("test") 
import system.dispatcher 
implicit val timeout = Timeout(2 seconds) 
val ref = system.actorOf(Props[MySimpleActor]) 
val fut = ref ? GetElement(List(1,2,3), 6) 

fut onComplete{ 
    case util.Success(result) => 
    println(s"success: $result") 

    case util.Failure(ex) => 
    println(s"FAIL: ${ex.getMessage}") 
    ex.printStackTrace()  
}  

Тогда это будет правильно ударил Failure блок. Теперь код в этом базовом признаке хорошо работает, когда Futures не участвуют в актере, который расширяет эту черту, как простой актер здесь. Но если вы используете Future s, тогда вам нужно быть осторожным, поскольку исключения, которые происходят в Future, не вызывают перезапуска в актере, а также в preRestart, вызов sender() не вернет правильную ссылку, потому что актер уже перешел в следующее сообщение. Актер, как это показывает, что вопрос:

class MyBadFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 

    def receive = { 
    case GetElement(list, i) => 
     val orig = sender() 
     val fut = Future{ 
     val result = list(i) 
     orig ! result 
     }  
    } 
} 

Если бы мы использовали этот актер в предыдущем тестовом коде, мы всегда получить тайм-аут в нештатной ситуации.Для того, чтобы смягчить, что вам нужно трубы результаты фьючерсов обратно отправителю, как так:

class MyGoodFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut pipeTo sender() 
    } 
} 

В данном конкретном случае, сам актер не будет возобновлено, потому что он не сталкивался неперехваченное исключение. Теперь, если ваш актер необходимо сделать некоторые дополнительные обработки после будущего, вы можете направить обратно к себе и явно не в состоянии, когда вы получаете Status.Failure:

class MyGoodFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut.to(self, sender()) 

    case d:Double => 
     sender() ! d * 2 

    case Status.Failure(ex) => 
     throw ex 
    } 
} 

Если такое поведение становится общим, вы можете сделать его доступным для любого актерам это нужно так:

trait StatusFailureHandling{ me:Actor => 
    def failureHandling:Receive = { 
    case Status.Failure(ex) => 
     throw ex  
    } 
} 

class MyGoodFutureUsingActor extends FailurePropatingActor with StatusFailureHandling{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = myReceive orElse failureHandling 

    def myReceive:Receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut.to(self, sender()) 

    case d:Double => 
     sender() ! d * 2   
    } 
} 
+1

Это выглядит хорошо, спасибо! –