2015-06-17 1 views
2

Я хочу иметь возможность делать параллельные запросы к нескольким хранилищам данных и консолидировать результаты. Я пытаюсь понять, действительно ли мой подход действителен или есть лучший способ подойти к этой проблеме. Я определенно новичок в Akka/Spray/Scala и действительно хочу лучше понять, как правильно создавать эти компоненты. Любые предложения/советы будут очень признательны. Попытка обернуть голову вокруг использования актеров и фьючерсов для такого типа реализации.Akka консолидировать запросы параллельной базы данных

Spray Услуги:

trait DemoService extends HttpService with Actor with ActorLogging { 
    implicit val timeout = Timeout(5 seconds) // needed for `?` below 
    val mongoMasterActor = context.actorOf(Props[MongoMasterActor], "redisactor") 
    val dbMaster = context.actorOf(Props[DbMasterActor], "dbactor") 

    val messageApiRouting = 
     path("summary"/Segment/Segment) { (dataset, timeslice) => 
      onComplete(getDbResponses(dbMaster, dataset, timeslice)) { 
      case Success(dbMessageResponse) => complete(s"The result was $dbMessageResponse") 
      case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}") 
      } 
     } 

    /** Passes the desired actor reference for a specific dataset and timeslice for summary data retrieval 
    * 
    * @param mongoActor an actor reference to the RedisActor that will handle the appropriate request routing 
    * @param dataset The dataset for which the summary has been requested 
    * @param timeslice The timeslice (Month, Week, Day, etc.) for which the summary has been requested 
    */ 
    def getSummary(mongoActor: ActorRef, dataset: String, timeslice: String): Future[DbMessageResponse] = { 
    log.debug(s"dataset: $dataset timeslice: $timeslice") 
    val dbMessage = DbMessage("summary", dataset + timeslice) 
    (mongoActor ? dbMessage).mapTo[DbMessageResponse] 
    } 

    def getDbResponses(dbActor: ActorRef, dataset: String, timeslice: String): Future[SummaryResponse] = { 
    log.debug(s"dataset: $dataset timeslice: $timeslice") 
    val dbMessage = DbMessage("summary", dataset + timeslice) 
    (dbActor ? dbMessage).mapTo[SummaryResponse] 
    } 

    def getSummaryPayload(mongoSummary: DbMessageResponse, redisSummary: DbMessageResponse): String = { 
    mongoSummary.response + redisSummary.response 
    } 

} 

Akka Актер/Будущие фиктивные DB запросы:

class DbMasterActor extends Actor with ActorLogging { 


    private var originalSender: ActorRef = _ 

    //TODO: Need to add routing to the config to limit instances 
    val summaryActor = context.actorOf(Props(new SummaryActor), "summaryactor") 

    def receive = { 

    case msg: DbMessage => { 
     this.originalSender = sender 
     msg.query match { 
     case "summary" => { 

      getDbResults().onComplete{ 
      case Success(result) => originalSender ! result 
      case Failure(ex) => log.error(ex.getMessage) 

      } 
     } 
     } 
    } 

    //If not match log an error 
    case _ => log.error("Received unknown message: {} ") 

    } 


    def getDbResults(): Future[SummaryResponse] = { 
    log.debug("hitting db results") 
    val mongoResult = Future{ Thread.sleep(500); "Mongo"} 
    val redisResult = Future{ Thread.sleep(800); "redis"} 

    for{ 
     mResult <- mongoResult 
     rResult <- redisResult 

    } yield SummaryResponse(mResult, rResult) 

    } 
} 
+0

Для-понимания по фьючерсу выглядит хорошо для меня, если это вопрос. – tariksbl

+0

Выглядит также хорошо. – shutty

ответ

0

После чтения эффективного Акку Джейми Аллен, я собираюсь попытаться применить его предложение «Камея».

Slideshare: http://www.slideshare.net/shinolajla/effective-akka-scalaio

Github: https://github.com/jamie-allen/effective_akka

Я думаю, что я создал будет работать, но не звучит как лучший подход, основанный на комментарии Джейми в своих переговорах. Я обновлю/отредактирую назад к этому сообщению то, что я реализовал (или попытаюсь).

Резюме Актер (Cameo актер):

object SummaryResponseHandler { 
    case object DbRetrievalTimeout 

    def props(mongoDb: ActorRef, redisDb: ActorRef, originalSender: ActorRef): Props = { 
    Props(new SummaryResponseHandler(mongoDb, redisDb, originalSender)) 
    } 
} 

class SummaryResponseHandler(mongoDb: ActorRef, redisDb: ActorRef, 
          originalSender: ActorRef) extends Actor with ActorLogging { 

    import SummaryResponseHandler._ 
    var mongoSummary, redisSummary: Option[String] = None 
    def receive = LoggingReceive { 
    case MongoSummary(summary) => 
     log.debug(s"Received mongo summary: $summary") 
     mongoSummary = summary 
     collectSummaries 
    case RedisSummary(summary) => 
     log.debug(s"Received redis summary: $summary") 
     redisSummary = summary 
     collectSummaries 
    case DbRetrievalTimeout => 
     log.debug("Timeout occurred") 
     sendResponseAndShutdown(DbRetrievalTimeout) 
    } 

    def collectSummaries = (mongoSummary, redisSummary) match { 
    case (Some(m), Some(r)) => 
     log.debug(s"Values received for both databases") 
     timeoutMessager.cancel 
     sendResponseAndShutdown(DataSetSummary(mongoSummary, redisSummary)) 
    case _ => 
    } 

    def sendResponseAndShutdown(response: Any) = { 
    originalSender ! response 
    log.debug("Stopping context capturing actor") 
    context.stop(self) 
    } 

    import context.dispatcher 
    val timeoutMessager = context.system.scheduler.scheduleOnce(
    250 milliseconds, self, DbRetrievalTimeout) 
} 

class SummaryRetriever(mongoDb: ActorRef, redisDb: ActorRef) extends Actor with ActorLogging { 
    def receive = { 
    case GetSummary(dataSet) => 
     log.debug("received dataSet") 
     val originalSender = sender 
     val handler = context.actorOf(SummaryResponseHandler.props(mongoDb,redisDb, originalSender), "cameo-message-handler") 
     mongoDb.tell(GetSummary(dataSet), handler) 
     redisDb.tell(GetSummary(dataSet), handler) 
    case _ => log.debug(s"Unknown result $GetSummary(datset)") 
    } 

} 

Общие:

case class GetSummary(dataSet: String) 
case class DataSetSummary(
    mongo: Option[String], 
    redis: Option[String] 
) 

case class MongoSummary(
    summary: Option[String] 
         ) 

case class RedisSummary(
    summary: Option[String] 
         ) 

trait MongoProxy extends Actor with ActorLogging 
trait RedisProxy extends Actor with ActorLogging 

Mock Заглушки:

class MongoProxyStub extends RedisProxy { 
    val summaryData = Map[String, String](
    "dataset1" -> "MongoData1", 
    "dataset2" -> "MongoData2") 

    def receive = LoggingReceive { 
    case GetSummary(dataSet: String) => 
     log.debug(s"Received GetSummary for ID: $dataSet") 
     summaryData.get(dataSet) match { 
     case Some(data) => sender ! MongoSummary(Some(data)) 
     case None => sender ! MongoSummary(Some("")) 
     } 
    } 
} 

class RedisProxyStub extends MongoProxy{ 
    val summaryData = Map[String, String](
    "dataset1" -> "RedisData1", 
    "dataset2" -> "RedisData2") 

    def receive = LoggingReceive { 
    case GetSummary(dataSet: String) => 
     log.debug(s"Received GetSummary for ID: $dataSet") 
     summaryData.get(dataSet) match { 
     case Some(data) => sender ! RedisSummary(Some(data)) 
     case None => sender ! RedisSummary(Some("")) 
     } 
    } 
} 

загрузки (Вы должны использовать тест, но просто хотел бежать от загрузки):

object Boot extends App{ 

    val system = ActorSystem("DbSystem") 

    val redisProxy = system.actorOf(Props[RedisProxyStub], "cameo-success-mongo") 
    val mongoProxy = system.actorOf(Props[MongoProxyStub], "cameo-success-redis") 
    val summaryRetrieverActor = system.actorOf(Props(new SummaryRetriever(redisProxy, mongoProxy)), "cameo-retriever1") 

    implicit val timeout = Timeout(5 seconds) 
    val future = summaryRetrieverActor ? GetSummary("dataset1") 
    val result = Await.result(future, timeout.duration).asInstanceOf[DataSetSummary] 
    println(Some(result.mongo).x) 
    println(result.redis) 

    system.shutdown() 

} 

Application Config:

akka.loglevel = "DEBUG" 
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] 
akka.actor.debug.autoreceive = on 
akka.actor.debug.lifecycle = on 
akka.actor.debug.receive = on 
akka.actor.debug.event-stream = on