2016-11-22 11 views
0

Я пытаюсь построить многопроцессорность в python, чтобы уменьшить скорость вычислений, но, похоже, после многопроцессорной обработки общая скорость вычислений значительно снизилась. Я создал 4 разных процесса и разделил dataFrame на 4 разных кадра данных, которые будут вкладом в каждый процесс. После выбора времени каждого процесса кажется, что накладные расходы являются значительными, и было интересно, есть ли способ уменьшить эти накладные расходы.Как сократить время многопроцессорности в python

Я использую windows7, python 3.5, и моя машина имеет 8 ядер.

def doSomething(args, dataPassed,): 

    processing data, and calculating outputs 

def parallelize_dataframe(df, nestedApply): 
    df_split = np.array_split(df, 4) 
    pool = multiprocessing.Pool(4) 
    df = pool.map(nestedApply, df_split) 
    print ('finished with Simulation') 
    time = float((dt.datetime.now() - startTime).total_seconds()) 

    pool.close() 
    pool.join() 

def nestedApply(df): 

    func2 = partial(doSomething, args=()) 
    res = df.apply(func2, axis=1) 
    res = [output Tables] 
    return res 

if __name__ == '__main__': 

data = pd.read_sql_query(query, conn) 

parallelize_dataframe(data, nestedApply) 
+2

Можете ли вы перечислить, сколько длинных одиночных потоков было выполнено против многопроцессорности? – Fruitspunchsamurai

+0

Сколько у вас процессоров/ядер (реальных, а не гиперпотоков)? Это похоже на интенсивную работу с ЦП, поэтому расщепление на большее количество ядер просто замедлит работу. Кроме того, насколько велики кадры данных и насколько дорого стоит «doSomething»? Чтобы получить фрейм данных для каждого подпроцесса, он должен быть сериализован (через рассол) и десериализован, поэтому, если кадры большие и 'doSomething' дешево, вы действительно увидите большую часть времени, потраченного на накладные расходы. –

+0

@ Fruitspunchsamurai Потребовалось 26 минут, чтобы запустить одиночную нить, в то время как для запуска функции отображения и всего 71 минуты всего потребовалось 33 минуты. – Hojin

ответ

0

Я бы предложил использовать очереди вместо предоставления вашей DataFrame в виде кусков. Вам нужно много ресурсов для копирования каждого фрагмента, и для этого требуется довольно много времени. У вас может быть нехватка памяти, если ваш DataFrame действительно большой. Используя очереди, вы можете воспользоваться быстрыми итераторами в пандах. Вот мой подход. Накладные расходы уменьшаются со сложностью ваших работников. К сожалению, моим работникам очень просто показать это, но sleep немного усложняет сложность.

import pandas as pd 
import multiprocessing as mp 
import numpy as np 
import time 


def worker(in_queue, out_queue): 
    for row in iter(in_queue.get, 'STOP'): 
     value = (row[1] * row[2]/row[3]) + row[4] 
     time.sleep(0.1) 
     out_queue.put((row[0], value)) 

if __name__ == "__main__": 
    # fill a DataFrame 
    df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD')) 

    in_queue = mp.Queue() 
    out_queue = mp.Queue() 

    # setup workers 
    numProc = 2 
    process = [mp.Process(target=worker, 
          args=(in_queue, out_queue)) for x in range(numProc)] 

    # run processes 
    for p in process: 
     p.start() 

    # iterator over rows 
    it = df.itertuples() 

    # fill queue and get data 
    # code fills the queue until a new element is available in the output 
    # fill blocks if no slot is available in the in_queue 
    for i in range(len(df)): 
     while out_queue.empty(): 
      # fill the queue 
      try: 
       row = next(it) 
       in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True) # row = (index, A, B, C, D) tuple 
      except StopIteration: 
       break 
     row_data = out_queue.get() 
     df.loc[row_data[0], "Result"] = row_data[1] 

    # signals for processes stop 
    for p in process: 
     in_queue.put('STOP') 

    # wait for processes to finish 
    for p in process: 
     p.join() 

Используя numProc = 2 он принимает 50sec за петлю, с numProc = 4 она в два раза быстрее.

 Смежные вопросы

  • Нет связанных вопросов^_^