2017-01-19 10 views
3

Я пытаюсь провести параллельное программирование с помощью Scala и Akka, для которых я новичок. У меня довольно простое приложение Monte Carlo Pi (аппроксимирует pi по кругу), который я построил на нескольких языках. Однако производительность версии, которую я построила в Akka, меня озадачивает.Akka - худшее исполнение с большим количеством актеров

У меня есть последовательная версия, написанная на чистой скале, которая имеет тенденцию к выполнению примерно 400 мс.

По сравнению с одним работником-актером версия Акка занимает около 300-350 мс, однако, поскольку я увеличиваю количество участников, это время резко возрастает. С 4 актерами время может составлять от 500 мс до 1200 мс или выше.

Количество итераций разделяется между действующими участниками, поэтому в идеале производительность должна быть лучше, чем больше из них есть, в настоящее время она становится значительно хуже.

Мой код

object MCpi{ 
    //Declare initial values 
    val numWorkers = 2 
    val numIterations = 10000000 

    //Declare messages that will be sent to actors 
    sealed trait PiMessage 
    case object Calculate extends PiMessage 
    case class Work(iterations: Int) extends PiMessage 
    case class Result(value: Int) extends PiMessage 
    case class PiApprox(pi: Double, duration: Double) 

    //Main method 
    def main(args: Array[String]): Unit = { 
    val system = ActorSystem("MCpi_System") //Create Akka system 
    val master = system.actorOf(Props(new MCpi_Master(numWorkers, numIterations))) //Create Master Actor 
    println("Starting Master") 

    master ! Calculate //Run calculation 
    } 
} 

//Master 
class MCpi_Master(numWorkers: Int, numIterations: Int) extends Actor{ 

    var pi: Double = _ // Store pi 
    var quadSum: Int = _ //the total number of points inside the quadrant 
    var numResults: Int = _ //number of results returned 
    val startTime: Double = System.currentTimeMillis() //calculation start time 

    //Create a group of worker actors 
    val workerRouter = context.actorOf(
    Props[MCpi_Worker].withRouter(RoundRobinPool(numWorkers)), name = "workerRouter") 
    val listener = context.actorOf(Props[MCpi_Listener], name = "listener") 

    def receive = { 
    //Tell workers to start the calculation 
     //For each worker a message is sent with the number of iterations it is to perform, 
     //iterations are split up between the number of workers. 
    case Calculate => for(i <- 0 until numWorkers) workerRouter ! Work(numIterations/numWorkers); 

     //Receive the results from the workers 
     case Result(value) => 
      //Add up the total number of points in the circle from each worker 
     quadSum += value 
      //Total up the number of results which have been received, this should be 1 for each worker 
     numResults += 1 

     if(numResults == numWorkers) { //Once all results have been collected 
      //Calculate pi 
      pi = (4.0 * quadSum)/numIterations 
      //Send the results to the listener to output 
     listener ! PiApprox(pi, duration = System.currentTimeMillis - startTime) 
     context.stop(self) 
     } 
    } 
} 
//Worker 
class MCpi_Worker extends Actor { 
    //Performs the calculation 
    def calculatePi(iterations: Int): Int = { 

    val r = scala.util.Random // Create random number generator 
    var inQuadrant: Int = 0 //Store number of points within circle 

    for(i <- 0 to iterations){ 
     //Generate random point 
     val X = r.nextFloat() 
     val Y = r.nextFloat() 

     //Determine whether or not the point is within the circle 
     if(((X * X) + (Y * Y)) < 1.0) 
     inQuadrant += 1 
    } 
    inQuadrant //return the number of points within the circle 
    } 

    def receive = { 
    //Starts the calculation then returns the result 
    case Work(iterations) => sender ! Result(calculatePi(iterations)) 
    } 
} 

//Listener 
class MCpi_Listener extends Actor{ //Recieves and prints the final result 
    def receive = { 
    case PiApprox(pi, duration) => 
     //Print the results 
     println("\n\tPi approximation: \t\t%s\n\tCalculation time: \t%s".format(pi, duration)) 
     //Print to a CSV file 
     val pw: FileWriter = new FileWriter("../../../..//Results/Scala_Results.csv", true) 
     pw.append(duration.toString()) 
     pw.append("\n") 
     pw.close() 
     context.system.terminate() 

    } 
} 

Равнина Scala последовательная версия

object MCpi { 
    def main(args: Array[String]): Unit = { 
     //Define the number of iterations to perform 
     val iterations = args(0).toInt; 
     val resultsPath = args(1); 

     //Get the current time 
     val start = System.currentTimeMillis() 


     // Create random number generator 
     val r = scala.util.Random 
     //Store number of points within circle 
     var inQuadrant: Int = 0 

     for(i <- 0 to iterations){ 
      //Generate random point 
      val X = r.nextFloat() 
      val Y = r.nextFloat() 

      //Determine whether or not the point is within the circle 
      if(((X * X) + (Y * Y)) < 1.0) 
       inQuadrant += 1 
     } 
     //Calculate pi 
     val pi = (4.0 * inQuadrant)/iterations 
     //Get the total time 
     val time = System.currentTimeMillis() - start 
     //Output values 
     println("Number of Iterations: " + iterations) 
     println("Pi has been calculated as: " + pi) 
     println("Total time taken: " + time + " (Milliseconds)") 

     //Print to a CSV file 
     val pw: FileWriter = new FileWriter(resultsPath + "/Scala_Results.csv", true) 
     pw.append(time.toString()) 
     pw.append("\n") 
     pw.close() 
    } 
} 

Любые предложения о том, почему это происходит и как я могу улучшить производительность будет очень приветствуется.

Edit: Я хотел бы поблагодарить всех вас за ваши ответы, это мой первый вопрос на этом сайте и все ответы являются чрезвычайно полезными, у меня есть много, чтобы посмотреть и сейчас :)

+0

В этом случае некоторую информацию о том, какой процессор (ы) вы используете это, вероятно, полезно. –

+0

1) Пожалуйста, разместите свой код на SO. Отформатируйте его перед публикацией. 2) Что вы ожидаете от реализации актера, когда вы выполняете метод «calculatePi» несколько раз, что из того, что я вижу, эквивалент вашей последовательной реализации? И из того, что я вижу, вы просто вычисляете PI несколько раз (количество вычислений эквивалентно количеству действующих участников, что, вероятно, является объяснением замедления)? Поправьте меня если я ошибаюсь. 3) Считаете ли вы, что вы можете ничего не выиграть, используя модель актера в этом случае? –

+0

@ Процессор Jasper-M - это четырехъядерный ядро ​​Intel i7-4510U @ 3.1GHz @Branislav 1) Хорошо, я попробую обновить сообщение с кодом, когда я буду свободен позже. 2) 'calculatePi' запускается каждым рабочим, он генерирует множество случайных точек и измеряет, находятся ли эти точки в пределах« круга »определенного размера (в данном случае 1.0), а затем возвращает количество точек в круге (quadSum), как только результаты возвращаются от каждого рабочего, расчет выполняется один раз, чтобы определить, что такое Pi (в мастер-актере). 3) Я предположил, что у меня будет какое-то увеличение производительности, разделяющее работу над несколькими актерами. – Cipher478

ответ

4

У вас возникла проблема синхронизации вокруг экземпляра Random, который вы используете.

Более конкретно, эта линия

val r = scala.util.Random // Create random number generator 

на самом деле не «создать генератор случайных чисел», но поднимает одноплодной object что scala.util удобно предлагает Вам. Это означает, что все потоки будут делиться им и будут синхронизироваться вокруг его семени (подробнее см. Код java.util.Random.nextFloat).

Просто изменив эту строку в

val r = new scala.util.Random // Create random number generator 

вы должны получить параллелизм ускорение. Как указано в комментариях, ускорение будет зависеть от вашей архитектуры и т. Д. И т. Д., Но, по крайней мере, она не будет настолько сильно подвержена сильной синхронизации.

Обратите внимание, что java.util будет использовать System.nanoTime как семя вновь созданного Random, поэтому вам не нужно беспокоиться о проблемах рандомизации.

+0

Спасибо, я не знал об этом, это действительно полезно знать. – Cipher478

+0

Не знал ни того, ни другого, я узнал о вашем примере :) хорошее обучение! Вы могли видеть ускорение? –

+0

Только что получил возможность реализовать ваше предложение и вау, резкую скорость. Результаты теперь намного больше соответствуют тому, что я ожидал. Большое спасибо. – Cipher478

-1

I что ваша проблема вызвана выполнением тяжелых вычислений в теле функции приема, может быть, некоторые из них работают в одном потоке, поэтому вы просто добавляете вес системы aktor к стандартным однопоточным вычислениям, тем самым делая это помедленнее. Из AKKA документации:

За кулисами Akka будет работать наборы актеров на множествах реальных потоков, где обычно много актеров разделяют одну нить, и последующие вызовы одного актера, может в конечном итоге обрабатывается в разных потоках. Akka гарантирует, что эта деталь реализации не влияет на однопоточность обработки состояния актера.

Я не уверен, если это имеет место, но вы можете попробовать запустить вычисление в будущем:

Future { 
    //your code 
} 

Чтобы сделать его работу необходимо предоставить неявный контекст выполнения, вы можете сделать это во многих пути, но две являются самыми легкими:

  1. Импорт контекста глобальное исполнение

  2. Импорт executio п контекст актера:

    импорт context.dispatcher

Второй должен быть использован insied вашего класса актер тела.

2

Я думаю, что это большой вопрос, стоящий в копании. Используя систему Akka Actor, которая работает с некоторыми системными издержками, я ожидаю, что прирост производительности будет наблюдаться только тогда, когда масштаб достаточно велик. Я тестировал ваши две версии (не-akka vs akka) с минимальным изменением кода. При 1 миллион или 10 миллионов обращений, как и ожидалось, вряд ли будет какая-либо разница в производительности, независимо от Akka против non-Akka или числа используемых работников. Но при 100 миллионах просмотров вы можете видеть постоянную разницу в производительности.

Помимо расширения всего хитов до 100 миллионов, единственное изменение кода я сделал заменял scala.util.Random с java.util.concurrent.ThreadLocalRandom:

//val r = scala.util.Random // Create random number generator 
def r = ThreadLocalRandom.current 
... 
    //Generate random point 
    //val X = r.nextFloat() 
    //val Y = r.nextFloat() 
    val X = r.nextDouble(0.0, 1.0) 
    val Y = r.nextDouble(0.0, 1.0) 

Это все было сделано на старом MacBook Pro с процессором Quadro Core 2 ГГц и 8 ГБ памяти. Вот тест перспективы результатов на 100 миллионов всего просмотров:

  • Non-Akka приложение занимает ~ 1720 мса
  • Akka приложение 2 рабочих занимает ~ 770 мса
  • Akka приложение 4 рабочих занимает ~ 430 мс

Индивидуальные тест-пробеги ниже ...

Non-Akka

$ SBT "runMain calcpi.MCpi 100000000/TMP"

определения

[Информация] Загрузка проекта из/Users/ИЕ/проекты// тест лестницы/Акка-высчитывает пи/проект [информация] Установить текущий проект Akka Pi Calculation (in build file:/Пользователи/leo/projects/scala/test/akka-calculate-pi /) [info] Запуск calcpi.MCpi 100000000/tmp Количество итераций: 100000000 Pi рассчитано как: 3.1415916 Общее время, затраченное: 1722 (миллисекунды) [успех] Общее время: 2 сек, завершенные 20 января 2017 3:26:20 PM

$ SBT «runMain calcpi.MCpi 100000000/tmp "

[info] Загрузка описания проекта из/Пользователи/leo/projects/scala/test/akka-calculate-pi/project [info] Установить текущий проект на Akka Pi Calculation (в файле сборки:/Пользователи/leo/projects/scala/test/akka-calculate-pi /) [info] Запуск calcpi.MCpi 100000000/tmp Число итераций: 100000000 Pi рассчитывается как: 3.1415972 Общее время: 1715 (миллисекунды) [успех] Общее время: 2 сек, завершено 20 января 2017 3:28:17 PM

Использование Akka

Количество работников = 4:

$ SBT "runMain calcpi.MCpi 100000000/TMP"

определение

[Информация] Загрузка проекта из/Users/Лев/проекты/Scala/тест/Акка-calculate- pi/project [info] Установить текущий проект в Akka Pi Calculation (в файле сборки:/Пользователи/leo/projects/scala/test/akka-calculate-pi /) [info] Запуск calcpi.MCpi 100000000/tmp Запуск Мастер

Pi approximation:  3.14110116 
Calculation time: 423.0 

[успех] Общее время: 1 с, завершено 20 января 2017 3:35:25 PM

$ SBT "runMain calcpi.MCpi 100000000/TMP"

определение

[Информация] Загрузка проекта из/Users/Лев/проекты/Scala/тест/Акка-стоимость -pi/project [info] Установить текущий проект на Akka Pi Calculation (в файле сборки:/Пользователи/leo/projects/scala/test/akka-calculate-pi /) [info] Запуск calcpi.MCpi 100000000/tmp Запуск Master

Pi approximation:  3.14181316 
Calculation time: 440.0 

[успех] Общее время: 1 сек, завершенные 20 января 2017 3:35:34 PM

Количество рабочих = 2:

$ SBT "runMain calcpi.MCpi 100000000/TMP"

определение

[Информация] Загрузка проекта из/Users/Лев/проекты/Scala/тест/Акка-calculate- pi/project [info] Установить текущий проект в Akka Pi Calculation (в файле сборки:/Пользователи/leo/projects/scala/test/akka-calculate-pi /) [info] Запуск calcpi.MCpi 100000000/tmp Запуск Мастер

Pi approximation:  3.14162344 
Calculation time: 766.0 

[успел] Общее время: 2 с, завершено 20 января 2017 3:36:34 PM

$ SBT "runMain calcpi.MCpi 100000000/TMP"

определение

[Информация] Загрузка проекта из/Users/Лев/проекты/Scala/тест/Акка-стоимость -pi/project [info] Установить текущий проект на Akka Pi Calculation (в файле сборки:/Пользователи/leo/projects/scala/test/akka-calculate-pi /) [info] Запуск calcpi.MCPI 100000000/TMP Запуск Master

Pi approximation:  3.14182148 
Calculation time: 787.0 

[успех] Общее время: 2 сек, завершенные 20 Январь, 2017 3:36:43 PM

+0

Вы были совершенно правы, вопрос был генератором случайных чисел, который я использовал. Там также большая разница в скорости с большим масштабом. Большое спасибо за это :) – Cipher478

+0

@ Cipher478 Рад, что это помогает. Да, учитывая, что симуляция Монте-Карло включает в себя генерацию случайных чисел в масштабе, для производительности я бы предложил использовать [генератор случайных чисел на основе потоков] (https://docs.oracle.com/javase/8/docs/api/java/ util/concurrent/ThreadLocalRandom.html), особенно когда на многоядерную машину задействованы несколько участников. –