2017-02-14 36 views
1

Я полный новичок в Акке и Скале. В качестве одного из моих первых проектов я решил сделать реализацию сортировки слияния, где вместо рекурсии я создаю новые актеры, которые выполняют разделение и слияние. Кажется, что моя система достигает листьев дерева сортировки слияния, и даже происходит некоторое слияние, но затем оно останавливается, и я получаю исключение AskTimeoutException. У меня была аналогичная проблема в другом проекте, связанном с Ask. Может ли кто-нибудь указать мне в правильном направлении?Akka Ask держит тайм-аут в реализации сортировки слиянием

ParentMerger получают реализация:

def receive = { 
    case ParentMerger.Begin => { 
     implicit var timeout = Timeout(60.seconds) 
     println("Parent sending off first halves") 
     // Assumption: at the beginning the array size is 2 or greater 
     var arrayFuture1 = mergers(0) ? ParentMerger.SendHalf(array.slice(0, array.length/2)) 
     var arrayFuture2 = mergers(1) ? ParentMerger.SendHalf(array.slice(array.length/2, array.length)) 

     arrayFuture1.onComplete { 
     case Success(Merger.Reply(arr1: ArrayBuffer[Int])) => { 
      arrayFuture2.onComplete { 
      case Success(Merger.Reply(arr2: ArrayBuffer[Int])) => { 
       print(merge(arr1, arr2).toString()) 
       println("Final merge done") 
      } 
      } 
     } 
     } 
    } 
    } 

Слияния получить реализацию:

def receive = { 
    case ParentMerger.SendHalf(array: ArrayBuffer[Int]) if array.length == 1 => { 
     println("Child received array of size 1") 
     sender() ! Merger.Reply(array) 
    } 
    case ParentMerger.SendHalf(array: ArrayBuffer[Int]) if array.length >= 2 => { 
     println("Child received an array of size >= 2") 
     for(i <- 0 to 1) { 
     mergers(i) = context.actorOf(Props[Merger]) 
     } 

     implicit var timeout = Timeout(60.seconds) 
     var arrayFuture1 = mergers(0) ? ParentMerger.SendHalf(array.slice(0, array.length/2)) 
     var arrayFuture2 = mergers(1) ? ParentMerger.SendHalf(array.slice(array.length/2, array.length)) 

     arrayFuture1.onComplete { 
     case Success(Merger.Reply(arr1: ArrayBuffer[Int])) => { 
      arrayFuture2.onComplete { 
      case Success(Merger.Reply(arr2: ArrayBuffer[Int])) => { 
       println("Child merge") 
       sender() ! Merger.Reply(merge(arr1, arr2)) 
      } 
      } 
     } 
     } 
    } 
    } 

Вывод, который я получаю:

Parent sending off first halves 
Child received an array of size >= 2 
Child received an array of size >= 2 
Child received an array of size >= 2 
Child received an array of size >= 2 
Child received array of size 1 
Child received array of size 1 
Child received an array of size >= 2 
Child received an array of size >= 2 
Child received array of size 1 
Child received an array of size >= 2 
Child received array of size 1 
Child received array of size 1 
Child received array of size 1 
Child received an array of size >= 2 
Child received array of size 1 
Child received array of size 1 
Child merge 
Child merge 
Child received array of size 1 
Child merge 
Child merge 
Child received array of size 1 
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$a/$b/$b#-1137516511] to Actor[akka://Main/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$b/$a#2073409209] to Actor[akka://Main/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$a/$a#-1459967586] to Actor[akka://Main/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$b/$b/$b#-2142577608] to Actor[akka://Main/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
scala.MatchError: Failure(akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://Main/user/app/$a#520493765]] after [60000 ms]. Sender[Actor[akka://Main/user/app#1966408365]] sent message of type "ParentMerger$SendHalf".) (of class scala.util.Failure) 
    at ParentMerger$$anonfun$receive$1.$anonfun$applyOrElse$1(ParentMerger.scala:67) 
    at ParentMerger$$anonfun$receive$1.$anonfun$applyOrElse$1$adapted(ParentMerger.scala:66) 
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) 
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140) 
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 

ответ

1

Вы отвечаете неверном актер:

sender() ! Merger.Reply(merge(arr1, arr2)) 

При вызове изнутри ответ на будущее, вероятно, не будет работать. Захват sender вне ваших onComplete блоков:

val originalSender = sender() 
    arrayFuture1.onComplete { 
    case Success(Merger.Reply(arr1: ArrayBuffer[Int])) => { 
     arrayFuture2.onComplete { 
     case Success(Merger.Reply(arr2: ArrayBuffer[Int])) => { 
      println("Child merge") 
      originalSender ! Merger.Reply(merge(arr1, arr2)) 
     } 
     } 
    } 
    } 
+0

Он установил ее. Спасибо! Итак, если я поставлю отправитель() в блоке onComplete, будет ли он ссылаться на отправителя Merger.Reply? –

+0

Мне не совсем ясно, что это такое. Это может быть ваш * актер (т. Е. 'Self'), но двойные вложенные блоки' onComplete' заставляют меня думать, что это, вероятно, какой-то подсистемный актер, который вы бы даже не имели доступа к Я узнал, что обычно лучше всего по этой причине избегайте шаблона 'ask': он действительно восприимчив к * явно невинным * рефакторингам, приводящим к загадочному (и молчащему) сбою кода. –

 Смежные вопросы

  • Нет связанных вопросов^_^