2014-10-29 4 views
1

Я пытаюсь найти самый аккуратный способ выполнить последовательность Futures в последовательности, где одно исполнение Будущего зависит от предыдущего. Я пытаюсь сделать это для произвольного количества фьючерсов.Scala - Выполнять произвольное число фьючерсов последовательно, но зависимо

случае Пользователь:

  • я извлек несколько идентификаторов из моей базы данных.
  • Теперь мне нужно получить некоторые связанные данные в веб-службе.
  • Я хочу остановиться, как только я нашел правильный результат.
  • Я забочусь только о результате, который преуспел.

Выполнение всех этих операций параллельно, а затем анализ полученных результатов не является вариантом. Я должен сделать один запрос одновременно и выполнить следующий запрос только в том случае, если предыдущий запрос не дал никаких результатов.

Текущее решение находится вдоль этих линий. Использование foldLeft для выполнения запросов, а затем только оценка следующего будущего, если предыдущее будущее соответствует некоторому условию.

def dblFuture(i: Int) = { i * 2 } 
val list = List(1,2,3,4,5) 
val future = list.foldLeft(Future(0)) { 
    (previousFuture, next) => { 
    for { 
     previousResult <- previousFuture 
     nextFuture <- { if (previousResult <= 4) dblFuture(next) else previousFuture } 
    } yield (nextFuture) 
    } 
} 

Большой недостаток этого является) я продолжаю обрабатывать все элементы, даже когда я получил результат я доволен и б) один раз я нашел результат я после, я держу оценивая предикат. В этом случае это просто, но на самом деле это может быть сложнее.

Я чувствую, что мне не хватает гораздо более элегантного решения.

+0

Я запутался в случае использования, потому что это не похоже подобно тому, как у вас есть вид зависимости потока данных, который вы описываете («одно исполнение Будущего зависит от предыдущего»), поскольку вы выполняете только следующее Будущее, если предыдущий результат в будущем был * пустым *. Что мне не хватает? То есть, следующее будущее зависит от предыдущего результата каким-либо образом, кроме как просто решить, выполняется ли оно? –

+0

Это очень похоже на http: // stackoverflow.com/questions/26438991/is-there-sequential-future-find/26439838 # 26439838 (см. мой ответ там) и http://stackoverflow.com/questions/26349318/how-to-invoke-a-method-again- and-again-until-it-returns-a-future-value –

+0

@ChrisMartin Следующее исполнение фьючерса зависит не только от того, удалось ли предыдущему Будущему, но и от ответа. Например: если предыдущее будущее содержит WSResponse со статусом 404, выполните следующее будущее, иначе нет. – healsjnr

ответ

5

Посмотрев на ваш пример, кажется, что предыдущий результат не имеет никакого отношения к последующим результатам, и вместо этого важно только то, что предыдущий результат удовлетворяет некоторому условию, чтобы предотвратить вычисление следующего результата. Если это так, вот рекурсивное решение, использующее filter и recoverWith.

def untilFirstSuccess[A, B](f: A => Future[B])(condition: B => Boolean)(list: List[A]): Future[B] = { 
    list match { 
     case head :: tail => f(head).filter(condition).recoverWith { case _: Throwable => untilFirstSuccess(f)(condition)(tail) } 
     case Nil => Future.failed(new Exception("All failed..")) 
    } 
} 

filter будет называться только тогда, когда Future завершена, и recoverWith будет вызываться только если Future не удалось.

def dblFuture(i: Int): Future[Int] = Future { 
    println("Executing.. " + i) 
    i * 2 
} 

val list = List(1, 2, 3, 4, 5) 

scala> untilFirstSuccess(dblFuture)(_ > 6)(list) 
Executing.. 1 
Executing.. 2 
Executing.. 3 
Executing.. 4 
res1: scala.concurrent.Future[Int] = [email protected] 

scala> res1.value 
res2: Option[scala.util.Try[Int]] = Some(Success(8)) 
+0

Большое спасибо! – Ikrom

2

опрятного путь, и «истинное функциональное программирование» является scalaz потока;) Однако вам нужно переключиться на scalaz.concurrent.Task из будущего для лестницы абстракции для «будущего результата». Это немного другое. Задача чистая, а будущее - «текущее вычисление», но у них много общего.

import scalaz.concurrent.Task 
    import scalaz.stream.Process 

    def dblTask(i: Int) = Task { 
    println(s"Executing task $i") 
    i * 2 
    } 

    val list = Seq(1,2,3,4,5) 

    val p: Process[Task, Int] = Process.emitAll(list) 

    val result: Task[Option[Int]] = 
    p.flatMap(i => Process.eval(dblTask(i))).takeWhile(_ < 10).runLast 

    println(s"result = ${result.run}") 

Результат:

Executing task 1 
Executing task 2 
Executing task 3 
Executing task 4 
Executing task 5 
result = Some(8) 

если ваш расчет уже SCALA будущее, вы можете превратить его в Task

implicit class Transformer[+T](fut: => SFuture[T]) { 
    def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = { 
    import scala.util.{Failure, Success} 
    import scalaz.syntax.either._ 
    Task.async { 
     register => 
     fut.onComplete { 
      case Success(v) => register(v.right) 
      case Failure(ex) => register(ex.left) 
     } 
    } 
    } 
} 
+0

Спасибо, Евгений, мне очень нравится это решение, но я не использую ScalaZ в настоящий момент надеялся на решение с использованием стандартного будущего Scala (отсюда и я отметил @LimbSoup как правильный ответ). Однако посмотрим на SacalZ. – healsjnr