2012-02-29 5 views
0

Я ищу возможности увеличить параллелизм и производительность в моем коде Scala 2.9/Akka 2.0 RC2. Учитывая следующий код:Каковы некоторые возможности улучшения производительности/параллелизма в следующем коде Scala + Akka?

import akka.actor._ 

case class DataDelivery(data:Double) 

class ComputeActor extends Actor { 
    var buffer = scala.collection.mutable.ArrayBuffer[Double]() 

    val functionsToCompute = List("f1","f2","f3","f4","f5") 
    var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]() 
    functionMap += {"f1" -> f1} 
    functionMap += {"f2" -> f2} 
    functionMap += {"f3" -> f3} 
    functionMap += {"f4" -> f4} 
    functionMap += {"f5" -> f5} 

    def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = { 
     buffer += data 
     buffer 
    } 

    def f1(map:Map[String,Any]):Double = { 
// println("hello from f1") 
     0.0 
    } 

    def f2(map:Map[String,Any]):Double = { 
// println("hello from f2") 
     0.0 
    } 

    def f3(map:Map[String,Any]):Double = { 
// println("hello from f3") 
     0.0 
    } 

    def f4(map:Map[String,Any]):Double = { 
// println("hello from f4") 
     0.0 
    } 

    def f5(map:Map[String,Any]):Double = { 
// println("hello from f5") 
     0.0 
    } 

    def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = { 
     var map = Map[String,Double]() 
     try { 
      functionsToCompute.foreach(function => { 
       val value = functionMap(function) 
       function match { 
        case "f1" => 
         var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0)) 
         map += {function -> v} 
        case "f2" => 
         var v = value(Map("lookback"->20,"buffer"->immutableBuffer)) 
         map += {function -> v} 
        case "f3" => 
         var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false)) 
         map += {function -> v} 
        case "f4" => 
         var v = value(Map("lookback"->40,"buffer"->immutableBuffer)) 
         map += {function -> v} 
        case "f5" => 
         var v = value(Map("buffer"->immutableBuffer)) 
         map += {function -> v} 
        case _ => 
         println(this.unhandled()) 
       } 
      }) 
     } catch { 
      case ex: Exception => 
       ex.printStackTrace() 
     } 
     map 
    } 

    def receive = { 
     case DataDelivery(data) => 
     val startTime = System.nanoTime()/1000 
     val answers = computeValues(updateData(data)) 
     val endTime = System.nanoTime()/1000 
     val elapsedTime = endTime - startTime 
     println("elapsed time is " + elapsedTime) 
     // reply or forward 
     case msg => 
     println("msg is " + msg) 
    } 

} 

object Test { 
    def main(args:Array[String]) { 
     val system = ActorSystem("actorSystem") 
     val computeActor = system.actorOf(Props(new ComputeActor),"computeActor") 
     var i = 0 
     while (i < 1000) { 
      computeActor ! DataDelivery(i.toDouble) 
      i += 1 
     } 
    } 
} 

Когда я запускаю этот вывод (преобразуется в микросекундах) является

elapsed time is 4898 
elapsed time is 184 
elapsed time is 144 
    . 
    . 
    . 
elapsed time is 109 
elapsed time is 103 

Вы можете увидеть инкрементный компилятор в JVM пинает в

Я думал, что один быстро. победа может быть, чтобы изменить

functionsToCompute.foreach(function => { 

в

functionsToCompute.par.foreach(function => { 

, но это приводит к следующим истекших раз

elapsed time is 31689 
elapsed time is 4874 
elapsed time is 622 
    . 
    . 
    . 
elapsed time is 698 
elapsed time is 2171 

Некоторая информация:

1) Я бегу это на Macbook Pro с 2 ядрами.

2) В полной версии функции представляют собой длительные операции, которые пересекают части изменяемого общего буфера. Это не кажется проблемой, поскольку получение сообщений из почтового ящика актера контролирует поток, но я подозреваю, что это может быть проблемой с увеличением параллелизма. Вот почему я конвертировал в IndexedSeq.

3) В полной версии, список functionsToCompute может меняться, так что не все элементы в functionMap обязательно называются (т.е.) functionMap.size может быть гораздо больше, чем functionsToCompute.size

4) Функции могут быть вычислены параллельно, но результирующая карта должна быть завершена до возвращения

Некоторые вопросы:

1) Что я могу сделать, чтобы сделать параллельную версию работать быстрее?

2) Где было бы целесообразно добавлять неблокирующие и блокирующие фьючерсы?

3) Где было бы целесообразно переслать вычисления другому актеру?

4) Каковы некоторые возможности для повышения неизменности/безопасности?

Спасибо, Брюс

+1

Я не уверен, что вы делаете с вашими ответами ... но это пункт 4, который для меня интересный. Похоже, вы можете хорошо использовать akka.dispatch.Future.sequence здесь. Создайте список фьючерсов, которые выполняют вычисления и используют последовательность, чтобы превратить это в будущее в список результатов. Когда это будущее вернется, сверните результаты в сводную карту/список/контейнер, в которых вы нуждаетесь. –

+0

@DerekWyatt. В полной версии ответы передаются другому актеру. Спасибо за ваш комментарий. –

+0

Почему у вас нет одного актера для каждой функции? –

ответ

2

Предоставление пример, в соответствии с просьбой (извините за задержку ... У меня нет уведомлений о для SO).

В документации Akka есть Section on 'Composing Futures', но я дам вам что-то более приспособленное к вашей ситуации.

Теперь, прочитав это, уделите немного времени, чтобы прочитать учебники и документы на веб-сайте Akka. Вам не хватает информации о ключе, которую эти документы предоставят вам.

import akka.dispatch.{Await, Future, ExecutionContext} 
import akka.util.duration._ 
import java.util.concurrent.Executors 

object Main { 
    // This just makes the example work. You probably have enough context 
    // set up already to not need these next two lines 
    val pool = Executors.newCachedThreadPool() 
    implicit val ec = ExecutionContext.fromExecutorService(pool) 

    // I'm simulating your function. It just has to return a tuple, I believe 
    // with a String and a Double 
    def theFunction(s: String, d: Double) = (s, d) 
    def main(args: Array[String]) { 
    // Here we run your functions - I'm just doing a thousand of them 
    // for fun. You do what yo need to do 
    val listOfFutures = (1 to 1000) map { i => 
     // Run them in parallel in the future 
     Future { 
     theFunction(i.toString, i.toDouble) 
     } 
    } 
    // These lines can be composed better, but breaking them up should 
    // be more illustrative. 
    // 
    // Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a 
    // Future with a sequence of results (i.e. Future[Seq[(String, Double)]]) 
    val futureOfResults = Future.sequence(listOfFutures) 

    // Convert that future into another future that contains a map instead 
    // instead of a sequence 
    val intermediate = futureOfResults map { _.toList.toMap } 

    // Wait for it complete. Ideally you don't do this. Continue to 
    // transform the future into other forms or use pipeTo() to get it to go 
    // as a result to some other Actor. "Await" is really just evil... the 
    // only place you should really use it is in silly programs like this or 
    // some other special purpose app. 
    val resultingMap = Await.result(intermediate, 1 second) 
    println(resultingMap) 

    // Again, just to make the example work 
    pool.shutdown() 
    } 
} 

Все, что вам нужно в пути к классам, чтобы получить этот ход является akka-actor банкой. Сайт Akka расскажет вам, как настроить то, что вам нужно, но это действительно просто.

+0

Спасибо за помощь. Это здорово и касается большинства моих вопросов. –

+0

... и да, я обязательно прочитаю документы. –

+0

Это довольно классный материал. Я поменял Await с onComplete, и это сработало красиво. Я попробую pipeTo() далее. –

 Смежные вопросы

  • Нет связанных вопросов^_^