Я хочу иметь возможность делать параллельные запросы к нескольким хранилищам данных и консолидировать результаты. Я пытаюсь понять, действительно ли мой подход действителен или есть лучший способ подойти к этой проблеме. Я определенно новичок в Akka/Spray/Scala и действительно хочу лучше понять, как правильно создавать эти компоненты. Любые предложения/советы будут очень признательны. Попытка обернуть голову вокруг использования актеров и фьючерсов для такого типа реализации.Akka консолидировать запросы параллельной базы данных
Spray Услуги:
trait DemoService extends HttpService with Actor with ActorLogging {
implicit val timeout = Timeout(5 seconds) // needed for `?` below
val mongoMasterActor = context.actorOf(Props[MongoMasterActor], "redisactor")
val dbMaster = context.actorOf(Props[DbMasterActor], "dbactor")
val messageApiRouting =
path("summary"/Segment/Segment) { (dataset, timeslice) =>
onComplete(getDbResponses(dbMaster, dataset, timeslice)) {
case Success(dbMessageResponse) => complete(s"The result was $dbMessageResponse")
case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}")
}
}
/** Passes the desired actor reference for a specific dataset and timeslice for summary data retrieval
*
* @param mongoActor an actor reference to the RedisActor that will handle the appropriate request routing
* @param dataset The dataset for which the summary has been requested
* @param timeslice The timeslice (Month, Week, Day, etc.) for which the summary has been requested
*/
def getSummary(mongoActor: ActorRef, dataset: String, timeslice: String): Future[DbMessageResponse] = {
log.debug(s"dataset: $dataset timeslice: $timeslice")
val dbMessage = DbMessage("summary", dataset + timeslice)
(mongoActor ? dbMessage).mapTo[DbMessageResponse]
}
def getDbResponses(dbActor: ActorRef, dataset: String, timeslice: String): Future[SummaryResponse] = {
log.debug(s"dataset: $dataset timeslice: $timeslice")
val dbMessage = DbMessage("summary", dataset + timeslice)
(dbActor ? dbMessage).mapTo[SummaryResponse]
}
def getSummaryPayload(mongoSummary: DbMessageResponse, redisSummary: DbMessageResponse): String = {
mongoSummary.response + redisSummary.response
}
}
Akka Актер/Будущие фиктивные DB запросы:
class DbMasterActor extends Actor with ActorLogging {
private var originalSender: ActorRef = _
//TODO: Need to add routing to the config to limit instances
val summaryActor = context.actorOf(Props(new SummaryActor), "summaryactor")
def receive = {
case msg: DbMessage => {
this.originalSender = sender
msg.query match {
case "summary" => {
getDbResults().onComplete{
case Success(result) => originalSender ! result
case Failure(ex) => log.error(ex.getMessage)
}
}
}
}
//If not match log an error
case _ => log.error("Received unknown message: {} ")
}
def getDbResults(): Future[SummaryResponse] = {
log.debug("hitting db results")
val mongoResult = Future{ Thread.sleep(500); "Mongo"}
val redisResult = Future{ Thread.sleep(800); "redis"}
for{
mResult <- mongoResult
rResult <- redisResult
} yield SummaryResponse(mResult, rResult)
}
}
Для-понимания по фьючерсу выглядит хорошо для меня, если это вопрос. – tariksbl
Выглядит также хорошо. – shutty