2016-12-06 4 views
1

Предположим, у меня есть следующий код:Scala Future and TimeoutException: как узнать причину?

val futureInt1 = getIntAsync1(); 
val futureInt2 = getIntAsync2(); 

val futureSum = for { 
    int1 <- futureInt1 
    int2 <- futureInt2 
} yield (int1 + int2) 

val sum = Await.result(futureSum, 60 seconds) 

Теперь предположим, что один из getIntAsync1 или getIntAsync2 занимает очень много времени, и это приводит к Await.result бросать исключение:

Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60 seconds] 

Как я должен знать который один из getIntAsync1 или getIntAsync2 все еще находился на рассмотрении и фактически приводил к таймауту?

Обратите внимание, что здесь я сливаясь 2 фьючерсами с молнией, и это простой пример вопроса, но в моем приложении у меня есть такой код на другом уровне (т.е. getIntAsync1 сами могут использовать Future.zip или Future.sequence, карты/flatMap/applicationative)

Как-то я хотел бы иметь возможность регистрировать ожидающие параллельные операции stacktraces, когда тайм-аут возникает в моем основном потоке, чтобы я мог знать, где находятся узкие места в моей системе.


У меня есть существующий устаревший API-интерфейс, который еще не полностью реактивирован и не будет так скоро. Я пытаюсь увеличить время отклика, используя параллелизм. Но с тех пор, как я использую этот код, стало более болезненным понять, почему что-то занимает много времени в моем приложении. Я был бы признателен за любой совет, который вы можете предоставить, чтобы помочь мне отладить такие проблемы.

+0

Что такое 'результат'? –

+0

Я думаю, что это должно быть 'futureSum' –

+0

В какие времена не было ни int1, ни int2, а новое будущее, созданное уроком – Lambder

ответ

1

Ключ реализация является то, что будущего не тайм-аута в вашем примере, это ваш вызывающий поток, который делает паузу в наиболее X время.

Итак, если вы хотите модель время в ваших фьючерсах, вы должны использовать zipWith на каждой ветке и zip с Будущим, которое будет содержать значение в течение определенного количества времени. Если вы используете Akka, вы можете использовать для этого akka.pattern.after вместе с Future.firstCompletedOf.

Теперь, даже если вы это сделаете, как вы определяете почему любое ваше будущее не было завершено вовремя, возможно, они зависели от других фьючерсов, которые не были завершены.

Вопрос сводится к следующему: вы пытаетесь сделать некоторые основные причины для пропускной способности? Затем вы должны следить за ExecutionContext, а не за свои фьючерсы. Фьючерсы - это только значения.

+0

Я бы ok, чтобы сделать дамп потока, когда возникает конфликт, есть ли способ запустить дамп потока в соответствующее время? Как при достижении предела ЕС? –

+0

Вы можете определенно сделать это, если напишите пользовательский EC. –

+0

Хм никогда не делал этого раньше, но, вероятно, попробую это да! –

0

Вы можете проверить, если будущее завершена, вызвав его метод isComplete

if (futureInt1.isComplete) { /*futureInt2 must be the culprit */ } 
if (futureInt2.isComplete) { /*futureInt1 must be the culprit */ } 
+0

, за исключением того, что мы выполняем эти две строки, оба фьючерса могут быть завершены, так как код и предыдущие операторы Await не являются атомами. – Lambder

+0

True. Другим вариантом было бы ожидать результата каждого отдельного будущего: val futureInt1 = Future {Await.result (getIntAsync1(), 60 секунд)} и вместо этого использовать для понимания этих фьючерсов. –

+0

Это немного изменит семантику, так как мы больше не будем ждать всего 60. – Lambder

0

В качестве первого подхода я хотел бы предложить, чтобы поднять ваше будущее [Int] в будущее [Попробуйте [Int]]. Нечто подобное:

object impl { 

    def checkException[T](in: Future[T]): Future[Try[T]] = 
    in.map(Success(_)).recover { 
     case e: Throwable => { 
     Failure(new Exception("Error in future: " + in)) 
     } 
    } 

    implicit class FutureCheck(s: Future[Int]) { 
    def check = checkException(s) 
    } 
} 

Тогда небольшая функция для объединения результатов, что-то вроде этого:

object test { 

    import impl._ 

    val futureInt1 = Future{ 1 } 
    val futureInt2 = Future{ 2 } 

    def combine(a: Try[Int], b: Try[Int])(f: (Int, Int) => (Int)) : Try[Int] = { 
    if(a.isSuccess && b.isSuccess) { 
     Success(f(a.get, b.get)) 
    } 
    else 
    Failure(new Exception("Error adding results")) 
    } 

    val futureSum = for { 
    int1 <- futureInt1.check 
    int2 <- futureInt2.check 
    } yield combine(int1, int2)(_ + _) 
} 

В futureSum вы будете иметь Try [Int] с целыми или Failure за исключением соответствующей возможной ошибке.

Может быть, это может быть полезно

+0

. Этот пример является хорошей иллюстрацией будущей обработки отказа. Но я не думаю, что тайм-аут - это будущий провал, поэтому, к сожалению, он не отвечает на исходный вопрос. – Lambder

+0

да я не вижу, как он отвечает на мой вопрос –

0
val futureInt1 = getIntAsync1(); 
val futureInt2 = getIntAsync2(); 

val futureSum = for { 
    int1 <- futureInt1 
    int2 <- futureInt2 
} yield (int1 + int2) 

Try(Await.result(futureSum, 60 seconds)) match { 
    case Success(sum) => println(sum) 
    case Failure(e) => println("we got timeout. the unfinished futures are: " + List(futureInt1, futureInt2).filter(!_.isCompleted) 
} 
+0

Что делать, если futureInt1 уже является составной частью других фьючерсов? Я хочу знать, какое, возможно, глубоко вложенное будущее занимает много времени, а не только на самом высоком уровне будущего состава –

1

Предлагаемое решение оборачивает каждый из будущего для блока в TimelyFuture, который требует тайм-аут и имя. Внутри он использует Await для обнаружения отдельных тайм-аутов. Пожалуйста, имейте в виду, что этот стиль использования фьючерсов не предназначен для производственного кода, так как он использует блокировку. Это только для диагностики, чтобы выяснить, какие фьючерсы требуют времени для завершения.

package xxx 

import java.util.concurrent.TimeoutException 

import scala.concurrent.{Future, _} 
import scala.concurrent.duration.Duration 
import scala.util._ 
import scala.concurrent.duration._ 

class TimelyFuture[T](f: Future[T], name: String, duration: Duration) extends Future[T] { 

    override def onComplete[U](ff: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = f.onComplete(x => ff(x)) 

    override def isCompleted: Boolean = f.isCompleted 

    override def value: Option[Try[T]] = f.value 

    @scala.throws[InterruptedException](classOf[InterruptedException]) 
    @scala.throws[TimeoutException](classOf[TimeoutException]) 
    override def ready(atMost: Duration)(implicit permit: CanAwait): TimelyFuture.this.type = { 
    Try(f.ready(atMost)(permit)) match { 
     case Success(v) => this 
     case Failure(e) => this 
    } 
    } 

    @scala.throws[Exception](classOf[Exception]) 
    override def result(atMost: Duration)(implicit permit: CanAwait): T = { 
    f.result(atMost) 
    } 

    override def transform[S](ff: (Try[T]) => Try[S])(implicit executor: ExecutionContext): Future[S] = { 
    val p = Promise[S]() 
    Try(Await.result(f, duration)) match { 
     case [email protected](_) => ff(s) match { 
     case Success(v) => p.success(v) 
     case Failure(e) => p.failure(e) 
     } 
     case Failure(e) => e match { 
     case e: TimeoutException => p.failure(new RuntimeException(s"future ${name} has timed out after ${duration}")) 
     case _ => p.failure(e) 
     } 
    } 
    p.future 
    } 

    override def transformWith[S](ff: (Try[T]) => Future[S])(implicit executor: ExecutionContext): Future[S] = { 
    val p = Promise[S]() 
    Try(Await.result(f, duration)) match { 
     case [email protected](_) => ff(s).onComplete({ 
     case Success(v) => p.success(v) 
     case Failure(e) => p.failure(e) 
     }) 
     case Failure(e) => e match { 
     case e: TimeoutException => p.failure(new RuntimeException(s"future ${name} has timed out after ${duration}")) 
     case _ => p.failure(e) 
     } 
    } 
    p.future 
    } 
} 

object Main { 

    import scala.concurrent.ExecutionContext.Implicits.global 

    def main(args: Array[String]): Unit = { 
    val f = Future { 
     Thread.sleep(5); 
     1 
    } 

    val g = Future { 
     Thread.sleep(2000); 
     2 
    } 

    val result: Future[(Int, Int)] = for { 
     v1 <- new TimelyFuture(f, "f", 10 milliseconds) 
     v2 <- new TimelyFuture(g, "g", 10 milliseconds) 
    } yield (v1, v2) 


    val sum = Await.result(result, 1 seconds) // as expected, this throws exception : "RuntimeException: future g has timed out after 10 milliseconds" 
    } 
} 
+0

, который также кажется верным решением для будущей композиции первого уровня, но на самом деле не справится с глубоко вложенным уровнем будущего состава imho –

+0

@SebastienLorber обновлено до гораздо более простого решения – Lambder

+0

hmmm да, это кажется намного проще с точки зрения пользователя и может выполнить эту работу. Однако я вижу некоторые ограничения.Если я увеличиваю период ожидания TimelyFuture до 1 минуты, а общая сумма ожидает всего 1 секунду, я все равно не буду знать, что происходит, поэтому мне нужно убедиться, что у моего дерева параллельных вычислений всегда меньше тайм-аутов для более глубоких листьев (но это имеет смысл в любом случае, просто сложнее поддерживать) –

1

Если вы просто ищете информационные показатели, на которых индивидуальное будущее было занимает много времени (или в сочетании с другими), лучше всего использовать обертку при создании фьючерсов регистрировать показатели:

object InstrumentedFuture { 
     def now() = System.currentTimeMillis() 
     def apply[T](name: String)(code: => T): Future[T] = { 
      val start = now() 
      val f = Future { 
      code 
      } 
      f.onComplete { 
       case _ => println(s"Future ${name} took ${now() - start} ms") 
      } 
      f 
     } 
    } 


    val future1 = InstrumentedFuture("Calculator") { /*...code...*/ } 
    val future2 = InstrumentedFuture("Differentiator") { /*...code...*/ } 
+0

Это похоже на порядочную и простую идею :), вероятно, не решит полностью мою проблему, но может помочь отладить самые простые случаи. –

+0

В предлагаемом решении есть несколько ошибок. Сделайте это вместо: 'Future ({val s = System.nanoTime(); (Try (code), System.nanoTime() - s)}) transform {case Success ((r, t)) => {println (s "Future $ {name} взял $ t ns"); r}} ' –