2016-03-18 2 views
4

Как я могу изменить следующий код (адаптированный от http://materials.jeremybejarano.com/MPIwithPython/pointToPoint.html), чтобы каждый экземпляр comm.Send был получен root = 0 и напечатан выход. На данный момент принимается только первая команда отправки.Получение нескольких команд отправки с использованием mpi4py

#passRandomDraw.py 
import numpy 
from mpi4py import MPI 
from mpi4py.MPI import ANY_SOURCE 
import numpy as np 

comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 

if rank == 0: 
    randNum = numpy.zeros(1) 
    print "Process before receiving random numbers" 


else: 
    for i in range(0,np.random.randint(1,10),1): 
     randNum = numpy.zeros(1) 
     randNum = numpy.random.random_sample(1) 
     print "Process", rank, "iteration", i, "drew the number", randNum[0] 
     comm.Send(randNum, dest=0) 


if rank == 0: 
    comm.Recv(randNum, ANY_SOURCE) 
    print "Process", rank, "received the number", randNum[0] 
+0

Просто уточнить/подтвердить. Правильно ли я предполагаю, что у вас нет возможности узнать перед собой, сколько сообщений будет отправлено по каждому рангу, даже по самому рангу? – Zulan

+0

В идеале я хотел бы отправлять сообщения из каждого ранга, не зная, сколько сообщений отправит ранг. Если это невозможно, можно было бы рассчитать в пределах ранга, сколько сообщений будет отправлено, но это будет отличаться для каждого ранга. – 218

ответ

3

Если вы не знаете, сколько сообщений вы будете отправлять, то вы должны ввести сообщение, отмечающий конец сообщения. Вы можете использовать это в общих чертах, используя специальные теги. Для того, чтобы избежать предоставлений несовпадения буфера для сообщения завершения, вы можете использовать probe проверку, какие сообщения приходят в

tag_data = 42 
tag_end = 23 

if rank == 0: 
    randNum = numpy.zeros(1) 
    print "Process before receiving random numbers" 
else: 
    for i in range(0,np.random.randint(1,10),1): 
     randNum = numpy.zeros(1) 
     randNum = numpy.random.random_sample(1) 
     print "Process", rank, "iteration", i, "drew the number", randNum[0] 
     comm.Send(randNum, dest=0, tag=tag_data) 
    # send the termination message. Using the lower-case interface is simpler 
    comm.send(None, dest=0, tag=tag_end) 

if rank == 0: 
    # For debugging it might be better to use a list of still active procsses 
    remaining = comm.Get_size() - 1 
    while remaining > 0: 
     s = MPI.Status() 
     comm.Probe(status=s) 
     # make sure we post the right kind of message 
     if s.tag == tag_data: 
      comm.Recv(randNum, s.source, tag=tag_data) 
      print "Process ", s.source, " received the number", randNum[0] 
     elif s.tag == tag_end: 
      # don't need the result here 
      print "Process ", rank, " is done" 
      comm.recv(source=s.source, tag=tag_end) 
      remaining -= 1 

Есть много вариантов этого. Например, если вы знаете, что сообщение является последним сообщением, вы можете объединить сообщение о завершении.

+0

На данный момент это не удается для меня в строке 'comm.probe (status = s)' с ошибкой 'comm.probe (status = s) AttributeError: объект 'mpi4py.MPI.Intracomm' не имеет атрибута 'probe'' , Кроме того, есть ли значение для номеров тегов, которые вы выбрали, или просто нужно идентифицировать 'tag_end' отдельно для' tag_data' и любое число будет делать? Или определенные числа зарезервированы для определенных процессов? – 218

+0

Может быть, 'probe' был представлен в версии 2.0.0 mpi4py. Просто замените верхний регистр 'Probe', не имеет значения. Значения тега полностью произвольны, они просто должны быть различны. Фактические значения в моих примерах - ссылки на [42] (https://en.wikipedia.org/wiki/42_%28number%29#Hitchhiker.27s_Guide_to_the_Galaxy) и [23] (https://en.wikipedia.org/ wiki/23_% 28film% 29), хотя я на самом деле не видел последнего. – Zulan

+0

Кажется, что 'comm.probe' пришел только в mpi4py версии 2.0.0. У меня есть установка 'python3', использующая это, и исходный код, который вы опубликовали (очевидно, с измененными операциями' print'). Однако с более старой версией mpi4py 'comm.Probe' процесс зависает. Также в строке 'print" Process ", rank," done "' this всегда печатает 'rank = 0'. Предположительно, заменяя 'rank' на' s.source', будет напечатан 'rank', который только что завершился и отправлен' tag_end'? – 218

1

Если каждый процесс знает количество отправляемых сообщений, следующие шаги могут быть разработаны, чтобы решить эту проблему:

1) Снизить количество сообщений для отправки в корневой процесс. Каждый процесс отправляет корню количество сообщений, которые он отправит позже. Эта операция называется редукцией и может быть выполнено с помощью функции comm.reduce(...)

2) Получить все сообщения о процессе 0.

Вот код, основанный на ваших, что должно сделать трюк. Это может быть пробежало mpirun -np 4 python main.py

#passRandomDraw.py 
import numpy 
from mpi4py import MPI 
from mpi4py.MPI import ANY_SOURCE 
import numpy as np 

comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 
size = comm.Get_size() 

#just in case, if numpy.random is seed with 
np.random.seed(np.random.randint(np.iinfo(np.uint32).min,np.iinfo(np.uint32).max)+rank) 

if rank == 0: 
    randNum = numpy.zeros(1) 
    print "Process before receiving random numbers" 
    nb=np.empty((1,),dtype=int) 
    nb0=np.zeros((1,),dtype=int) 
    comm.Reduce([nb0, MPI.INT],[nb, MPI.INT],op=MPI.SUM, root=0) #sums the total number of random number from every process on rank 0, in nb. 
    #print "rank"+str(rank)+" nb "+str(nb) 
else: 
    nb=np.empty((1,),dtype=int) 
    nb[0]=np.random.randint(1,10) 
    #print "rank"+str(rank)+" nb "+str(nb) 
    comm.Reduce([nb, MPI.INT],None,op=MPI.SUM, root=0) 
    for i in range(0,nb[0],1): 
     randNum = numpy.zeros(1) 
     randNum = numpy.random.random_sample(1) 
     print "Process", rank, "iteration", i, "drew the number", randNum[0] 
     comm.Send(randNum, dest=0) 



if rank == 0: 
    for i in range(nb[0]): #receives nb message, each one with its int. 
     comm.Recv(randNum, ANY_SOURCE) 
     print "Process", rank, "received the number", randNum[0] 

Согласно documentation of numpy.random() генератору номера Вихря Мерсена псевдослучайному первоначально посеянный рядом, извлеченным из /dev/urandom (или аналога Windows), если таковые имеются, или семян от часов в противном случае. Следовательно, в последнем случае все процессы могут получать одно и то же семя и генерировать одни и те же случайные числа. Чтобы этого не произошло, я добавил следующую строку:

np.random.seed(np.random.randint(np.iinfo(np.uint32).min,np.iinfo(np.uint32).max)+rank)