2015-06-17 10 views
2

Следуя документации Akka Cluster, у меня есть пример рабочего набора.Комбинирование маршрута распыления + соответствие шаблону актера

http://doc.akka.io/docs/akka/snapshot/java/cluster-usage.html

Так что я пытаюсь интегрировать, что с маршрутизацией спрея.

Моя идея - иметь кластер за кулисами и через http rest, вызвать эту услугу.

У меня есть следующий код.

object Boot extends App { 

    val port = if (args.isEmpty) "0" else args(0) 
    val config = 
    ConfigFactory 
     .parseString(s"akka.remote.netty.tcp.port=$port") 
     .withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")) 
     .withFallback(ConfigFactory.load()) 

    val system = ActorSystem("ClusterSystem", config) 
    val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend") 
    implicit val actSystem = ActorSystem() 

    IO(Http) ! Http.Bind(frontend, interface = config.getString("http.interface"), port = config.getInt("http.port")) 
} 

class TransformationFrontend extends Actor { 

    var backends = IndexedSeq.empty[ActorRef] 
    var jobCounter = 0 
    implicit val timeout = Timeout(5 seconds) 

    override def receive: Receive = { 

    case _: Http.Connected => sender ! Http.Register(self) 

    case HttpRequest(GET, Uri.Path("/job"), _, _, _) => 

     jobCounter += 1 
     val backend = backends(jobCounter % backends.size) 

     val originalSender = sender() 

     val future : Future[TransformationResult] = (backend ? new TransformationJob(jobCounter + "-job")).mapTo[TransformationResult] 
     future onComplete { 
     case Success(s) => 
      println("received from backend: " + s.text) 
      originalSender ! s.text 
     case Failure(f) => println("error found: " + f.getMessage) 
     } 

    case job: TransformationJob if backends.isEmpty => 
     sender() ! JobFailed("Service unavailable, try again later", job) 

    case job: TransformationJob => 
     jobCounter += 1 
     backends(jobCounter % backends.size) forward job 

    case BackendRegistration if !backends.contains(sender()) => 
     println("backend registered") 
     context watch sender() 
     backends = backends :+ sender() 

    case Terminated(a) => 
     backends = backends.filterNot(_ == a) 
    } 
} 

Но то, что я действительно хочу сделать, - это комбинировать распылительную маршрутизацию с соответствующими шаблонами.

Вместо того, чтобы писать мой GET, как выше, я хотел бы написать так:

path("job") { 
    get { 
    respondWithMediaType(`application/json`) { 
     complete { 
     (backend ? new TransformationJob(jobCounter + "-job")).mapTo[TransformationResult] 
     } 
    } 
    } 
} 

Но расширяя мой актер с этим классом, я должен сделать следующее

def receive = runRoute(defaultRoute) 

Как могу ли я объединить этот подход с моими методами сопоставления шаблонов Actors для трансформации? BackendRegistration, Terminated, TransformationJob?

+1

Как насчет перенаправления запросов на спрей другому актеру, который имеет логику для выполнения сопоставления с образцом. Если вы можете передать requestContext в сообщении, чтобы вы могли выполнить HTTP-запрос оттуда. –

ответ

3

Вы можете составить PartialFunction походит Receive с PartialFunction.orElse:

class TransformationFrontend extends Actor { 
    // ... 
    def myReceive: Receive = { 
    case job: TransformationJob => // ... 
    // ... 
    } 
    def defaultRoute: Route = 
    get { 
     // ... 
    } 
    override def receive: Receive = runRoute(defaultRoute) orElse myReceive 
} 

Тем не менее, он часто имеет смысл разделить функциональность на несколько актеров (как это было предложено в комментарии выше), если это возможно.

+0

Отлично, я был настолько сосредоточен, пытаясь смешать оба вместе, что я никогда не подумал бы разделить методы приема. Спасибо. –