2014-01-14 6 views
4

Мне нужна помощь в выяснении того, как автоматизировать тестовое усилие в python.Как автоматизировать зависимую от среды переменную BLAS в python/numpy?

Я тестирую эффекты потоковой передачи в библиотечных вызовах BLAS через numpy в python. В среде linux потоки в OpenBLAS контролируются с помощью переменной окружения OMP_NUM_THREADS. Я хочу сделать тест, где я увеличиваю OMP_NUM_THREADS от 1 до максимального значения, время подпрограммы при каждом количестве потоков, а затем, наконец, манипулирует совокупной синхронизацией для всех подсчетов потоков.

Проблема заключается в следующем. Переменные окружения могут быть заданы в python, но они влияют только на подпроцессы или подоболочки. Так что я могу правильно запустить свой тест с помощью следующего кода драйвера:

#!/usr/bin/env python                          # driver script for thread test 
import os 

thread_set =[1,2,4,8,16] 
for thread in thread_set: 

    os.environ['OMP_NUM_THREADS']='{:d}'.format(thread) 
    os.system("echo $OMP_NUM_THREADS") 
    os.system("numpy_test") 

и numpy_test скрипт:

#!/usr/bin/env python 
#timing test for numpy dot product (using OpenBLAS)              
#based on http://stackoverflow.com/questions/11443302/compiling-numpy-with-openblas-integration 
import sys 
import timeit 

setup = "import numpy; x = numpy.random.random((1000,1000))" 
count = 5 

t = timeit.Timer("numpy.dot(x, x.T)", setup=setup) 
dot_time = t.timeit(count)/count 
print("dot: {:7.3g} sec".format(dot_time)) 

, но анализируя это очень ручной процесс.

В частности, я не могу вернуть значение dot_time от numpy_test до моей внешней процедуры оболочки, поэтому я не могу анализировать результаты своего теста любым автоматическим способом. В качестве примера я хотел бы построить dot_time против количества потоков или оценить, является ли dot_time/количество потоков постоянным.

При попытке сделать подобный тест полностью в пределах экземпляра питона, определив тестовую Python функции (избегая os.system() подход выше), а затем запустить тестовую функцию внутри цикла thread in thread_set, то все экземпляры пробной функции наследует то же значение для OMP_NUM_THREADS (родительской оболочки python). Так что этот тест не пройден:

#!/usr/bin/env python 
#attempt at testing threads that doesn't work 
#(always uses inherited value of OMP_NUM_THREADS) 
import os 

import sys 
import timeit 

def test_numpy(): 
    setup = "import numpy; x = numpy.random.random((1000,1000))" 
    count = 5 

    t = timeit.Timer("numpy.dot(x, x.T)", setup=setup) 
    dot_time = t.timeit(count)/count 
    print("dot: {:7.3g} sec".format(dot_time)) 
    return dot_time 

thread_set =[1,2,4,8,16] 
for thread in thread_set: 
    os.environ['OMP_NUM_THREADS']='{:d}'.format(thread) 
    os.system("echo $OMP_NUM_THREADS") 
    time_to_run = test_numpy() 
    print(time_to_run) 

Это терпит неудачу в том, что каждый экземпляр thread занимает столько же времени, как и test_numpy() всегда наследует значение OMP_NUM_THREADS в родительской среде, а не значение, установленное с помощью os.environ(). Если бы что-то подобное сработало, было бы тривиально делать анализ, который мне нужно сделать.

В реальном тесте я буду работать с несколькими 1000 перестановками, поэтому автоматизация является ключевой. Учитывая, что я оценил бы ответ на любой из этих вопросов:

  1. Как бы вы вернуть значение (dot_time) из подпроцесса, как это? Есть ли более элегантное решение, чем чтение/запись файла?

  2. Есть ли лучший способ структурировать этот тип (зависящий от переменной среды) тест?

Заранее спасибо.

ответ

2

Вы можете сделать что-то вроде этого:

import subprocess 

os.environ['OMP_NUM_THREADS'] = '{:d}'.format(thread) 
proc = subprocess.Popen(["numpy_test"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
stdout, stderr = proc.communicate() 

Тогда вы будете иметь выход в numpy_test сценария в стандартный вывод. В целом я считаю, что subprocess.call и subprocess.Popen предпочитают более os.system.

+0

Это работает в паре с синтаксическим разрывом строк. Замечание о выполнении: опечатка в 'proc.comunicate()' должна быть 'proc.communicate()'. – bpbrown

1

Если вы хотите получить выходные данные из подпроцесса, используйте subprocess.check_output, например.заменить

os.system("numpy_test") 

с

dot_output = subprocess.check_output(["numpy_test"]) 
dot_time = ... # extract time from dot_output