1

Я пытаюсь сравнить время работы непараллельной версии и параллельной версии функции. Проблема в том, что, хотя параллельная функция хорошо работает с пакетом потоковой передачи, процессы просто не запускаются после переключения на пакет многопроцессорности. Мне интересно, вызвано ли это моим компилятором или чем-то еще. Может ли кто-нибудь запустить мой код, чтобы увидеть, работает ли он в другой среде? И если это не так, в чем проблема в моем коде?Многопроцессорная работа, не работающая для функции вычисления кросс-произведения матриц

import numpy as np 
from multiprocessing import Process 

def single_row(a,b,output): 
    for j in range(len(b[0])): 
     for k in range(len(a)): 
      output[j]=output[j]+a[k]*b[k][j] 

#Parallel Matrix Cross Multiplication 
def cross_parallel(a,b): 
    if len(a[0])==len(b): 
     tasks=[None]*len(a) 
     T=np.array([[0]*len(b[0])]*len(a)) 
     for i in range(len(a)): 
      tasks[i]=Process(target=single_row,args=(a[i],b,T[i])) 
     for task in tasks: 
      task.start() 
     for task in tasks: 
      task.join() 
     return T 
    else: 
     print 'Error: Invalid Matrices' 

#Non-parallel Matrix Cross Multiplication 
def cross_basic(a,b): 
    if len(a[0])==len(b): 
     T=np.array([[0]*len(b[0])]*len(a)) 
     for i in range(len(a)): 
      for j in range(len(b[0])): 
       for k in range(len(a[0])): 
        T[i][j]=T[i][j]+a[i][k]*b[k][j] 
     return T 
    else: 
     print 'Error: Invalid Matrices' 

if __name__ == '__main__':  
    x=[[1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16]] 
    y=[[1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16]] 
    print cross_basic(x,y) 
    print cross_parallel(x,y) 

Результат:

[[ 90 100 110 120] 
[202 228 254 280] 
[314 356 398 440] 
[426 484 542 600]] 
[[0 0 0 0] 
[0 0 0 0] 
[0 0 0 0] 
[0 0 0 0]] 

версия с использованием многопоточности пакет, который работает (отличается только в строке 15):

import numpy as np 
from threading import Thread 

def single_row(a,b,output): 
    for j in range(len(b[0])): 
     for k in range(len(a)): 
      output[j]=output[j]+a[k]*b[k][j] 

#Parallel Matrix Cross Multiplication 
def cross_parallel(a,b): 
    if len(a[0])==len(b): 
     tasks=[None]*len(a) 
     T=np.array([[0]*len(b[0])]*len(a)) 
     for i in range(len(a)): 
      tasks[i]=Thread(target=single_row,args=(a[i],b,T[i])) 
     for task in tasks: 
      task.start() 
     for task in tasks: 
      task.join() 
     return T 
    else: 
     print 'Error: Invalid Matrices' 

#Non-parallel Matrix Cross Multiplication 
def cross_basic(a,b): 
    if len(a[0])==len(b): 
     T=np.array([[0]*len(b[0])]*len(a)) 
     for i in range(len(a)): 
      for j in range(len(b[0])): 
       for k in range(len(a[0])): 
        T[i][j]=T[i][j]+a[i][k]*b[k][j] 
     return T 
    else: 
     print 'Error: Invalid Matrices' 

if __name__ == '__main__':  
    x=[[1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16]] 
    y=[[1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16]] 
    print cross_basic(x,y) 
    print cross_parallel(x,y) 

Результат:

[[ 90 100 110 120] 
[202 228 254 280] 
[314 356 398 440] 
[426 484 542 600]] 
[[ 90 100 110 120] 
[202 228 254 280] 
[314 356 398 440] 
[426 484 542 600]] 

ответ

1

Когда вы используя потоки, матрицу результатов T разделяется в потоках (это означает, что это в основном один и тот же объект, используя одни и те же слоты памяти). Таким образом, модификация T у ребенка Thread изменит локальную версию T и вы получите правильный результат.

Для подпроцессов каждый ребенок Process получает новую копию T. Таким образом, модификация T в подпроцессе не изменяет его локальную версию. Чтобы получить правильный результат, вам нужно отправить результат вычисления, используя, например, Queue. Но вы должны быть осторожны, так как порядок, в котором вы возвращаете результаты, не является детерминированным.

import numpy as np 
from multiprocessing import Process, Queue 


def single_row(a, b, idx, q): 
    N = len(b[0]) 
    output = np.zeros(N) 
    for j in range(len(b[0])): 
     for k in range(len(a)): 
      output[j] = output[j]+a[k]*b[k][j] 
    q.put((idx, output)) 

# Parallel Matrix Cross Multiplication 


def cross_parallel(a, b): 
    M = len(a) 
    q = Queue() 
    if len(a[0]) == len(b): 
     tasks = [None]*M 
     T = np.array([[0]*len(b[0])]*len(a)) 
     for i in range(M): 
      tasks[i] = Process(target=single_row, args=(a[i], b, i, q)) 
     for task in tasks: 
      task.start() 
     T = [] 
     for i in range(M): 
      T += [q.get()] 
     for task in tasks: 
      task.join() 
     T.sort() 
     T = np.array([v[1] for v in T]) 
     return T 
    else: 
     print('Error: Invalid Matrices') 

# Non-parallel Matrix Cross Multiplication 


def cross_basic(a, b): 
    if len(a[0]) == len(b): 
     T = np.array([[0]*len(b[0])]*len(a)) 
     for i in range(len(a)): 
      for j in range(len(b[0])): 
       for k in range(len(a[0])): 
        T[i][j] = T[i][j]+a[i][k]*b[k][j] 
     return T 
    else: 
     print('Error: Invalid Matrices') 

if __name__ == '__main__': 
    x = [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]] 
    y = [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]] 
    print(cross_basic(x, y)) 
    print(cross_parallel(x, y)) 

Этот подход может быть непросто использовать, например, если вы не опорожнить очередь до вступления в процессы, вы можете вызвать тупики в вашей программе. Я рекомендую посмотреть multiprocessing.Pool или concurrents.futures.ProcessPoolExecutor, чтобы получить протокол высокого уровня, который лучше управляет связью/количеством процессов.

def single_row2(a, b): 
    N = len(b[0]) 
    output = np.zeros(N) 
    for j in range(len(b[0])): 
     for k in range(len(a)): 
      output[j] = output[j]+a[k]*b[k][j] 
    return output 

def cross_parallel2(a, b): 
    import itertools 
    from concurrent.futures import ProcessPoolExecutor 
    executor = ProcessPoolExecutor(max_workers=4) 
    M = len(a) 
    if len(a[0]) == len(b): 
     res = executor.map(single_row2, a, itertools.repeat(b)) 

     return np.array([row for row in res]) 
    else: 
     print('Error: Invalid Matrices')