Я ищу возможности увеличить параллелизм и производительность в моем коде Scala 2.9/Akka 2.0 RC2. Учитывая следующий код:Каковы некоторые возможности улучшения производительности/параллелизма в следующем коде Scala + Akka?
import akka.actor._
case class DataDelivery(data:Double)
class ComputeActor extends Actor {
var buffer = scala.collection.mutable.ArrayBuffer[Double]()
val functionsToCompute = List("f1","f2","f3","f4","f5")
var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()
functionMap += {"f1" -> f1}
functionMap += {"f2" -> f2}
functionMap += {"f3" -> f3}
functionMap += {"f4" -> f4}
functionMap += {"f5" -> f5}
def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
buffer += data
buffer
}
def f1(map:Map[String,Any]):Double = {
// println("hello from f1")
0.0
}
def f2(map:Map[String,Any]):Double = {
// println("hello from f2")
0.0
}
def f3(map:Map[String,Any]):Double = {
// println("hello from f3")
0.0
}
def f4(map:Map[String,Any]):Double = {
// println("hello from f4")
0.0
}
def f5(map:Map[String,Any]):Double = {
// println("hello from f5")
0.0
}
def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
var map = Map[String,Double]()
try {
functionsToCompute.foreach(function => {
val value = functionMap(function)
function match {
case "f1" =>
var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
map += {function -> v}
case "f2" =>
var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
map += {function -> v}
case "f3" =>
var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
map += {function -> v}
case "f4" =>
var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
map += {function -> v}
case "f5" =>
var v = value(Map("buffer"->immutableBuffer))
map += {function -> v}
case _ =>
println(this.unhandled())
}
})
} catch {
case ex: Exception =>
ex.printStackTrace()
}
map
}
def receive = {
case DataDelivery(data) =>
val startTime = System.nanoTime()/1000
val answers = computeValues(updateData(data))
val endTime = System.nanoTime()/1000
val elapsedTime = endTime - startTime
println("elapsed time is " + elapsedTime)
// reply or forward
case msg =>
println("msg is " + msg)
}
}
object Test {
def main(args:Array[String]) {
val system = ActorSystem("actorSystem")
val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
var i = 0
while (i < 1000) {
computeActor ! DataDelivery(i.toDouble)
i += 1
}
}
}
Когда я запускаю этот вывод (преобразуется в микросекундах) является
elapsed time is 4898
elapsed time is 184
elapsed time is 144
.
.
.
elapsed time is 109
elapsed time is 103
Вы можете увидеть инкрементный компилятор в JVM пинает в
Я думал, что один быстро. победа может быть, чтобы изменить
functionsToCompute.foreach(function => {
в
functionsToCompute.par.foreach(function => {
, но это приводит к следующим истекших раз
elapsed time is 31689
elapsed time is 4874
elapsed time is 622
.
.
.
elapsed time is 698
elapsed time is 2171
Некоторая информация:
1) Я бегу это на Macbook Pro с 2 ядрами.
2) В полной версии функции представляют собой длительные операции, которые пересекают части изменяемого общего буфера. Это не кажется проблемой, поскольку получение сообщений из почтового ящика актера контролирует поток, но я подозреваю, что это может быть проблемой с увеличением параллелизма. Вот почему я конвертировал в IndexedSeq.
3) В полной версии, список functionsToCompute может меняться, так что не все элементы в functionMap обязательно называются (т.е.) functionMap.size может быть гораздо больше, чем functionsToCompute.size
4) Функции могут быть вычислены параллельно, но результирующая карта должна быть завершена до возвращения
Некоторые вопросы:
1) Что я могу сделать, чтобы сделать параллельную версию работать быстрее?
2) Где было бы целесообразно добавлять неблокирующие и блокирующие фьючерсы?
3) Где было бы целесообразно переслать вычисления другому актеру?
4) Каковы некоторые возможности для повышения неизменности/безопасности?
Спасибо, Брюс
Я не уверен, что вы делаете с вашими ответами ... но это пункт 4, который для меня интересный. Похоже, вы можете хорошо использовать akka.dispatch.Future.sequence здесь. Создайте список фьючерсов, которые выполняют вычисления и используют последовательность, чтобы превратить это в будущее в список результатов. Когда это будущее вернется, сверните результаты в сводную карту/список/контейнер, в которых вы нуждаетесь. –
@DerekWyatt. В полной версии ответы передаются другому актеру. Спасибо за ваш комментарий. –
Почему у вас нет одного актера для каждой функции? –