2016-08-11 3 views
1

Я новичок в mpi4py. Я написал код для обработки большого массива numpy data несколькими процессорами. Поскольку я не могу предоставить входной файл, я упоминаю форму data. Форма data - [3000000,15] и содержит строковый тип данных.mpi4py Gatherv лицом KeyError: '0'

from mpi4py import MPI 
import numpy as np 
import datetime as dt 
import math as math 


comm = MPI.COMM_WORLD 
numprocs = comm.size 
rank = comm.Get_rank() 
fname = "6.binetflow" 
data = np.loadtxt(open(fname,"rb"), dtype=object, delimiter=",", skiprows=1) 
X = data[:,[0,1,3,14,6,6,6,6,6,6,6,6]] 
num_rows = math.ceil(len(X)/float(numprocs)) 
X = X.flatten() 
sendCounts = list() 
displacements = list() 
for p in range(numprocs): 
    if p == (numprocs-1): #for last processor 
     sendCounts.append(int(len(X) - (p*num_rows*12))) 
     displacements.append(int(p*num_rows*12)) 
     break 
    sendCounts.append(int(num_rows*12)) 
    displacements.append(int(p*sendCounts[p])) 
sendbuf = np.array(X[displacements[rank]: (displacements[rank]+sendCounts[rank])]) 

## Each processor will do some task on sendbuf 

if rank == 0: 
    recvbuf = np.empty(sum(sendCounts), dtype=object) 
else: 
    recvbuf = None 

print("sendbuf: ",sendbuf) 
comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendCounts), root=0) 
if rank == 0: 
    print("Gathered array: {}".format(recvbuf)) 

Но я столкнулся ниже ошибки:

Traceback (most recent call last): 
    File "hello.py", line 36, in <module> 
    comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendCounts), root=0) 
    File "MPI/Comm.pyx", line 602, in mpi4py.MPI.Comm.Gatherv (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:97993) 
    File "MPI/msgbuffer.pxi", line 525, in mpi4py.MPI._p_msg_cco.for_gather (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:34678) 
    File "MPI/msgbuffer.pxi", line 446, in mpi4py.MPI._p_msg_cco.for_cco_send (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:33938) 
    File "MPI/msgbuffer.pxi", line 148, in mpi4py.MPI.message_simple (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:30349) 
    File "MPI/msgbuffer.pxi", line 93, in mpi4py.MPI.message_basic (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:29448) 
KeyError: 'O' 
Traceback (most recent call last): 
    File "hello.py", line 36, in <module> 
    comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendCounts), root=0) 
    File "MPI/Comm.pyx", line 602, in mpi4py.MPI.Comm.Gatherv (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:97993) 
    File "MPI/msgbuffer.pxi", line 525, in mpi4py.MPI._p_msg_cco.for_gather (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:34678) 
    File "MPI/msgbuffer.pxi", line 446, in mpi4py.MPI._p_msg_cco.for_cco_send (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:33938) 
    File "MPI/msgbuffer.pxi", line 148, in mpi4py.MPI.message_simple (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:30349) 
    File "MPI/msgbuffer.pxi", line 93, in mpi4py.MPI.message_basic (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:29448) 
KeyError: 'O' 
Traceback (most recent call last): 
    File "hello.py", line 36, in <module> 
    comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendCounts), root=0) 
    File "MPI/Comm.pyx", line 602, in mpi4py.MPI.Comm.Gatherv (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:97993) 
    File "MPI/msgbuffer.pxi", line 525, in mpi4py.MPI._p_msg_cco.for_gather (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:34678) 
    File "MPI/msgbuffer.pxi", line 446, in mpi4py.MPI._p_msg_cco.for_cco_send (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:33938) 
    File "MPI/msgbuffer.pxi", line 148, in mpi4py.MPI.message_simple (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:30349) 
    File "MPI/msgbuffer.pxi", line 93, in mpi4py.MPI.message_basic (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:29448) 
KeyError: 'O' 
Traceback (most recent call last): 
    File "hello.py", line 36, in <module> 
    comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendCounts), root=0) 
    File "MPI/Comm.pyx", line 602, in mpi4py.MPI.Comm.Gatherv (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:97993) 
    File "MPI/msgbuffer.pxi", line 516, in mpi4py.MPI._p_msg_cco.for_gather (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:34587) 
    File "MPI/msgbuffer.pxi", line 466, in mpi4py.MPI._p_msg_cco.for_cco_recv (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:34097) 
    File "MPI/msgbuffer.pxi", line 261, in mpi4py.MPI.message_vector (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:31977) 
    File "MPI/msgbuffer.pxi", line 93, in mpi4py.MPI.message_basic (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:29448) 
KeyError: 'O' 

Любая помощь будет оценена. Я застрял в этой проблеме в течение длительного времени.

Благодаря

ответ

0

Проблема заключается в dtype=object.

Mpi4py предоставляет два вида функций связи, те, чьи имена начинаются с буквы верхнего регистра, например. Scatter, и те, чьи имена начинаются с строчной буквы, например. scatter. From the Mpi4py documentation:

In MPI for Python, the Bcast(), Scatter(), Gather(), Allgather() and Alltoall() methods of Comm instances provide support for collective communications of memory buffers. The variants bcast(), scatter(), gather(), allgather() and alltoall() can communicate generic Python objects.

Что не ясно из этого является то, что даже если Numpy массивов якобы разоблачать буфера памяти, буфера, по-видимому должны быть одним из небольшого набора примитивных типов данных, и, конечно, не работают с общим объекты. Сравните следующие два фрагмента кода:

from mpi4py import MPI 
import numpy 

Comm = MPI.COMM_WORLD 
Size = Comm.Get_size() 
Rank = Comm.Get_rank() 

if Rank == 0: 
    Data = numpy.empty(Size, dtype=object) 
else: 
    Data = None 

Data = Comm.scatter(Data, 0) # I work fine! 

print("Data on rank %d: " % Rank, Data) 

и

from mpi4py import MPI 
import numpy 

Comm = MPI.COMM_WORLD 
Size = Comm.Get_size() 
Rank = Comm.Get_rank() 

if Rank == 0: 
    Data = numpy.empty(Size, dtype=object) 
else: 
    Data = None 

Datb = numpy.empty(1, dtype=object) 

Comm.Scatter(Data, Datb, 0) # I throw KeyError! 

print("Datb on rank %d: " % Rank, Datb) 

К сожалению, Mpi4py не дает scatterv. Из того же места в документации:

The vector variants (which can communicate different amounts of data to each process) Scatterv(), Gatherv(), Allgatherv() and Alltoallv() are also supported, they can only communicate objects exposing memory buffers.

Это не исключение из прописных против строчных правил для dtypes, либо:

from mpi4py import MPI 
import numpy 

Comm = MPI.COMM_WORLD 
Size = Comm.Get_size() 
Rank = Comm.Get_rank() 

if Rank == 0: 
    Data = numpy.empty(2*Size+1, dtype=numpy.dtype('float64')) 
else: 
    Data = None 

if Rank == 0: 
    Datb = numpy.empty(3, dtype=numpy.dtype('float64')) 
else: 
    Datb = numpy.empty(2, dtype=numpy.dtype('float64')) 

Comm.Scatterv(Data, Datb, 0) # I work fine! 

print("Datb on rank %d: " % Rank, Datb) 

против

from mpi4py import MPI 
import numpy 

Comm = MPI.COMM_WORLD 
Size = Comm.Get_size() 
Rank = Comm.Get_rank() 

if Rank == 0: 
    Data = numpy.empty(2*Size+1, dtype=object) 
else: 
    Data = None 

if Rank == 0: 
    Datb = numpy.empty(3, dtype=object) 
else: 
    Datb = numpy.empty(2, dtype=object) 

Comm.Scatterv(Data, Datb, 0) # I throw KeyError! 

print("Datb on rank %d: " % Rank, Datb) 

Вы» К сожалению, вам необходимо написать свой код, чтобы он мог использовать scatter, что потребовало того же SendCount для каждого процесса, или более примитивных функций связи «точка-точка», или использовать некоторые параметры параллельная установка, отличная от Mpi4py.

Использование Mpi4py 2.0.0, текущей стабильной версии на момент написания этой статьи.