2016-11-25 5 views
0

У меня есть панда Dataframe, у которого есть миллионы строк, и я должен выполнять рядные операции. Поскольку у меня есть многоядерный процессор, я бы хотел ускорить этот процесс, используя Multiprocessing. То, как я хотел бы сделать это, - просто разделить блок данных в одинаковых размерах данных и обработать каждый из них в рамках отдельного процесса. До сих пор так хорошо ... Проблема в том, что мой код написан в стиле ООП, и я получаю ошибки Pickle с использованием многопроцессного пула. Что я делаю, я передаю ссылку на функцию класса self.X в пул. Я также использую атрибуты класса в пределах X (только для чтения). Я действительно не хочу возвращаться к функциональному стилю программирования ... Следовательно, возможно ли сделать многопроцессорность в ООП-envirnoment?Многопроцессорство с функциями класса и атрибутами класса

ответ

0

Это должно быть возможно, если все элементы вашего класса (которые вы переходите на подпроцессы) являются picklable. Это единственное, что вам нужно сделать. Если в вашем классе нет каких-либо элементов, то вы не можете передать его в пул. Даже если вы пройдете только self.x, все остальное, как self.y, должно быть выбрано.

Я делаю свою обработку панды Dataframe так:

import pandas as pd 
import multiprocessing as mp 
import numpy as np 
import time 


def worker(in_queue, out_queue): 
    for row in iter(in_queue.get, 'STOP'): 
     value = (row[1] * row[2]/row[3]) + row[4] 
     time.sleep(0.1) 
     out_queue.put((row[0], value)) 

if __name__ == "__main__": 
    # fill a DataFrame 
    df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD')) 

    in_queue = mp.Queue() 
    out_queue = mp.Queue() 

    # setup workers 
    numProc = 2 
    process = [mp.Process(target=worker, 
          args=(in_queue, out_queue)) for x in range(numProc)] 

    # run processes 
    for p in process: 
     p.start() 

    # iterator over rows 
    it = df.itertuples() 

    # fill queue and get data 
    # code fills the queue until a new element is available in the output 
    # fill blocks if no slot is available in the in_queue 
    for i in range(len(df)): 
     while out_queue.empty(): 
      # fill the queue 
      try: 
       row = next(it) 
       in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True) # row = (index, A, B, C, D) tuple 
      except StopIteration: 
       break 
     row_data = out_queue.get() 
     df.loc[row_data[0], "Result"] = row_data[1] 

    # signals for processes stop 
    for p in process: 
     in_queue.put('STOP') 

    # wait for processes to finish 
    for p in process: 
     p.join() 

Таким образом, я не должен проходить большие куски DataFrames, и я не должен думать о том пригодны для консервирования элементов в моем классе.

 Смежные вопросы

  • Нет связанных вопросов^_^