2016-06-21 2 views
1

Я хочу использовать многопроцессорность в Python для ускорения цикла while.Параллелизация/многопроцессорность условного цикла

Подробнее:
У меня есть матрица (образцы * функции). Я хочу выбрать x подмножества образцов, значения которых при случайном подмножестве признаков не равны определенному значению (в данном случае -1).

Мой серийный код:

np.random.seed(43) 
datafile = '...' 
df = pd.read_csv(datafile, sep=" ", nrows = 89) 

no_feat = 500 
no_samp = 5 
no_trees = 5 
i=0 
iter=0 


samples = np.zeros((no_trees, no_samp)) 
features = np.zeros((no_trees, no_feat)) 

while i < no_trees: 
    rand_feat = np.random.choice(df.shape[1], no_feat, replace=False) 
    iter_order = np.random.choice(df.shape[0], df.shape[0], replace=False) 

    samp_idx = [] 
    a=0 

#-------------- 
    #how to run in parallel? 

    for j in iter_order: 
     pot_samp = df.iloc[j, rand_feat] 
     if len(np.where(pot_samp==-1)[0]) == 0: 
      samp_idx.append(j) 
     if len(samp_idx) == no_samp: 
      print a 
      break 
     a+=1 

#-------------- 

    if len(samp_idx) == no_samp: 
     samples[i,:] = samp_idx 
     features[i, :] = rand_feat 
     i+=1 
    iter+=1 
    if iter>1000: #break if subsets cannot be found 
     break 

Поиск для установки образцов является потенциально дорогостоящей частью (J цикла), который теоретически может работать параллельно. В некоторых случаях нет необходимости перебирать все образцы, чтобы найти достаточно большое подмножество, поэтому я выхожу из цикла, как только подмножество достаточно велико.
Я изо всех сил пытаюсь найти реализацию, которая позволила бы проверять, сколько уже сформированных действительных результатов. Возможно ли это?

Я использовал joblib раньше. Если я правильно понимаю, это использует методы многопроцессорности pool как бэкэнд, который работает только для отдельных задач? Я думаю, что queues может оказаться полезным, но пока я не смог реализовать их.

+0

Использование '' joblib' или multiprocessing.pool' имеет смысл. Я запускаю процесс на каждое ядро ​​и создаю общий счетчик, защищенный «блокировкой» или реализованный как атомное целое число, увеличивая его до тех пор, пока он не достигнет определенного счета (с учетом дубликатов), а затем все процессы завершатся, возвращая их результаты. (Для этого вы, вероятно, можете использовать 'apply_async()'). – advance512

+1

@ advance512 Спасибо, что предоставили мне эти методы для изучения. – Dahlai

ответ

0

Я нашел рабочее решение. Я решил запустить цикл while параллельно и взаимодействовать с разными процессами через общий счетчик. Кроме того, я векторизовал поиск подходящих образцов.

Векторизация дает ускорение в 300 раз и работает на 4 ядрах, ускоряет вычисление ~ в два раза.

Сначала я попытался реализовать отдельные процессы и поместить результаты в queue. Оказывается, они не предназначены для хранения больших объемов данных.

Если кто-то видит еще одно узкое место в этом коде, я был бы рад, если бы кто-то указал на это.

С моим в основном несуществующим знанием о параллельных вычислениях мне было очень сложно разобраться в этом вместе, тем более, что пример в Интернете все очень прост. Я узнал много, хотя =)

Мой код:

import numpy as np 
import pandas as pd 
import itertools 
from multiprocessing import Pool, Lock, Value 
from datetime import datetime 
import settings 


val = Value('i', 0) 
worker_ID = Value('i', 1) 
lock = Lock() 

def findSamp(no_trees, df, no_feat, no_samp): 
    lock.acquire() 
    print 'starting worker - {0}'.format(worker_ID.value) 
    worker_ID.value +=1 
    worker_ID_local = worker_ID.value 
    lock.release() 

    max_iter = 100000 
    samp = [] 
    feat = [] 
    iter_outer = 0 
    iter = 0 
    while val.value < no_trees and iter_outer<max_iter: 
     rand_feat = np.random.choice(df.shape[1], no_feat, replace=False 

     #get samples with random features from dataset; 
     #find and select samples that don't have missing values in the random features 
     samp_rand = df.iloc[:,rand_feat] 
     nan_idx = np.unique(np.where(samp_rand == -1)[0]) 
     all_idx = np.arange(df.shape[0]) 
     notnan_bool = np.invert(np.in1d(all_idx, nan_idx)) 
     notnan_idx = np.where(notnan_bool == True)[0] 

     if notnan_idx.shape[0] >= no_samp: 
      #if enough samples for random feature subset, select no_samp samples randomly 
      notnan_idx_rand = np.random.choice(notnan_idx, no_samp, replace=False) 
      rand_feat_rand = rand_feat 

      lock.acquire() 
      val.value += 1 
      #x = val.value 
      lock.release() 
      #print 'no of trees generated: {0}'.format(x) 
      samp.append(notnan_idx_rand) 
      feat.append(rand_feat_rand) 

     else: 
      #increase iter_outer counter if no sample subset could be found for random feature subset 
      iter_outer += 1 

     iter+=1 
    if iter >= max_iter: 
     print 'exiting worker{0} because iter >= max_iter'.format(worker_ID_local) 
    else: 
     print 'worker{0} - finished'.format(worker_ID_local) 
    return samp, feat 

def initialize(*args): 
    global val, worker_ID, lock 
    val, worker_ID, lock = args 

def star_findSamp(i_df_no_feat_no_samp): 
    return findSamp(*i_df_no_feat_no_samp) 


if __name__ == '__main__': 
    np.random.seed(43) 
    datafile = '...' 
    df = pd.read_csv(datafile, sep=" ", nrows = 89) 
    df = df.fillna(-1) 
    df = df.iloc[:, 6:] 

    no_feat = 700 
    no_samp = 10 
    no_trees = 5000 


    startTime = datetime.now() 
    print 'starting multiprocessing' 
    ncores = 4 
    p = Pool(ncores, initializer=initialize, initargs=(val, worker_ID, lock)) 
    args = itertools.izip([no_trees]*ncores, itertools.repeat(df), itertools.repeat(no_feat), itertools.repeat(no_samp)) 

    result = p.map(star_findSamp, args)#, callback=log_result) 
    p.close() 
    p.join() 

    print '{0} sample subsets for tree training have been found'.format(val.value) 

    samples = [x[0] for x in result if x != None] 
    samples = np.vstack(samples) 
    features = [x[1] for x in result if x != None] 
    features = np.vstack(features) 
    print datetime.now() - startTime