Я пытаюсь изучить многопроцессорность с помощью python. Я написал простой код, который должен кормить каждый процесс 1000 строк из входного файла txt. Моя основная функция читает строку, разбивает ее, а затем выполняет некоторые очень простые операции с элементами в строке. В конечном итоге результаты должны быть записаны в выходной файл.Многопроцессор в python использует только один процесс
Когда я запустил его, 4 процесса были правильно порождены, но только один процесс фактически работает с минимальным процессором. В результате код очень медленный и бросает вызов цели использования многопроцессорности в первую очередь. Я думаю, что у меня нет глобальной проблемы с списком, как в этом вопросе (python multiprocessing apply_async only uses one process), и я не думаю, что моя функция слишком тривиальна, как в этом случае (Python multiprocessing.Pool() doesn't use 100% of each CPU).
Я не могу понять, что я делаю неправильно, любая помощь/предложение приветствуются. Вот основной код:
import multiprocessing
import itertools
def myfunction(line):
returnlist=[]
list_of_elem=line.split(",")
elem_id=list_of_elem[1]
elem_to_check=list_of_elem[5]
ids=list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
if x[1] == elem_to_check:
returnlist.append(",".join([elem_id,x,"1\n"]))
else:
returnlist.append(",".join([elem_id,x,"0\n"]))
return returnlist
def grouper(n, iterable, padvalue=None):
return itertools.izip_longest(*[iter(iterable)]*n, fillvalue=padvalue)
if __name__ == '__main__':
my_data = open(r"my_input_file_to_be_processed.txt","r")
my_data = my_data.read().split("\n")
p = multiprocessing.Pool(4)
for chunk in grouper(1000, my_data):
results = p.map(myfunction, chunk)
for r in results:
with open (r"my_output_file","ab") as outfile:
outfile.write(r)
EDIT Я изменил свой код, следуя предложениям (удаление избыточных данных, предварительной обработки). Однако проблема все еще существует.
import multiprocessing
import itertools
def myfunction(line):
returnlist=[]
list_of_elem=line.split(",")
elem_id=list_of_elem[1]
elem_to_check=list_of_elem[5]
ids=list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
if x[1] == elem_to_check:
returnlist.append(",".join([elem_id,x,"1\n"]))
else:
returnlist.append(",".join([elem_id,x,"0\n"]))
return returnlist
if __name__ == '__main__':
my_data = open(r"my_input_file_to_be_processed.txt","r")
p = multiprocessing.Pool(4)
results = p.map(myfunction, chunk, chunksize=1000)
for r in results:
with open (r"my_output_file","ab") as outfile:
outfile.write(r)
All вашей внешней петли кажется бессмысленной меня 'p.map' распределят порции линии среди рабочих. И зачем разрезать данные вручную, когда 'Pool.map' уже имеет параметр' chunksize'? – robyschek
Думаю, вы не правильно готовите свои данные. вы должны только один раз вызвать «Pool.map» с чем-то вроде «p.map (func, dataset)», если ваш набор данных был ранее разбит на соответствующее количество фрагментов или используйте параметр 'chunksize', например' p.map (func, dataset, chunksize) ', если это не так. (Помещение «Pool.map» в ваш цикл заставляет вас вычислять каждый кусок один за другим, а не одновременно). – mgc
Спасибо вам за предложение относительно кусков. @robyscheck: Мне кажется, мне все же нужно разделить куски в одиночных строках, на которых работает основная функция, правильно? – user2447387