У меня есть панда Dataframe, у которого есть миллионы строк, и я должен выполнять рядные операции. Поскольку у меня есть многоядерный процессор, я бы хотел ускорить этот процесс, используя Multiprocessing. То, как я хотел бы сделать это, - просто разделить блок данных в одинаковых размерах данных и обработать каждый из них в рамках отдельного процесса. До сих пор так хорошо ... Проблема в том, что мой код написан в стиле ООП, и я получаю ошибки Pickle с использованием многопроцессного пула. Что я делаю, я передаю ссылку на функцию класса self.X
в пул. Я также использую атрибуты класса в пределах X
(только для чтения). Я действительно не хочу возвращаться к функциональному стилю программирования ... Следовательно, возможно ли сделать многопроцессорность в ООП-envirnoment?Многопроцессорство с функциями класса и атрибутами класса
0
A
ответ
0
Это должно быть возможно, если все элементы вашего класса (которые вы переходите на подпроцессы) являются picklable. Это единственное, что вам нужно сделать. Если в вашем классе нет каких-либо элементов, то вы не можете передать его в пул. Даже если вы пройдете только self.x
, все остальное, как self.y
, должно быть выбрано.
Я делаю свою обработку панды Dataframe так:
import pandas as pd
import multiprocessing as mp
import numpy as np
import time
def worker(in_queue, out_queue):
for row in iter(in_queue.get, 'STOP'):
value = (row[1] * row[2]/row[3]) + row[4]
time.sleep(0.1)
out_queue.put((row[0], value))
if __name__ == "__main__":
# fill a DataFrame
df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD'))
in_queue = mp.Queue()
out_queue = mp.Queue()
# setup workers
numProc = 2
process = [mp.Process(target=worker,
args=(in_queue, out_queue)) for x in range(numProc)]
# run processes
for p in process:
p.start()
# iterator over rows
it = df.itertuples()
# fill queue and get data
# code fills the queue until a new element is available in the output
# fill blocks if no slot is available in the in_queue
for i in range(len(df)):
while out_queue.empty():
# fill the queue
try:
row = next(it)
in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True) # row = (index, A, B, C, D) tuple
except StopIteration:
break
row_data = out_queue.get()
df.loc[row_data[0], "Result"] = row_data[1]
# signals for processes stop
for p in process:
in_queue.put('STOP')
# wait for processes to finish
for p in process:
p.join()
Таким образом, я не должен проходить большие куски DataFrames, и я не должен думать о том пригодны для консервирования элементов в моем классе.