0

Итак, я использую процессы и очередь для поиска данных и поиска строк, которые имеют одну и ту же запись в разных столбцах. Я решил использовать многопроцессорную обработку, чтобы попытаться сделать так, что можно масштабировать для больших данных. Файл имеет 1000 строк и 10 точек данных в строке. Я читаю только 80 строк данных и программных киосков. 70 линий, и он отлично работает и при приличной скорости.Поиск списка многопроцессорных сетей

Мой вопрос в том, что я делаю неправильно или это ограничения с таким подходом, который я не идентифицировал? Код не идеален никакими средствами и, вероятно, сам по себе плох. Код выглядит следующим образом:

from multiprocessing import Process, Queue 
import random 

def openFile(file_name, k, division): 
    i = 0 
    dataSet = [] 
    with open(file_name) as f: 
     for line in f: 
      stripLine = line.strip('\n') 
      splitLine = stripLine.split(division) 
      dataSet += [splitLine] 
      i += 1 
      if(i == k): 
       break 

    return(dataSet) 

def setCombination(q,data1,data2): 
    newData = [] 
    for i in range(0,len(data1)): 
     for j in range(0, len(data2)): 
      if(data1[i][1] == data2[j][3]): 
       newData += data2[j] 
    q.put(newData) 

if __name__ == '__main__': 
    # Takes in the file, the length of the data to read in, and how the data is divided. 
    data = openFile('testing.txt', 80, ' ') 
    for i in range(len(data)): 
     for j in range(len(data[i])): 
      try: 
       data[i][j] = float(data[i][j]) 
      except ValueError: 
       pass 

    #print(data) 
    k = len(data)//10 
    q = Queue() 
    processes = [Process(target=setCombination, args=(q, data[k*x: k + k*x], data)) 
                   for x in range(10)] 
    for p in processes: 
     p.start() 

    # Exit the completed processes 
    for p in processes: 
     p.join() 

    saleSet = [q.get() for p in processes] 
    print('\n', saleSet) 

Файл testing.txt

+2

10k точки данных кажется, что слишком мало, чтобы извлечь выгоду из многопроцессорных. Попробуйте написать простейшее однопроцессорное однопоточное решение, которое вы можете сначала, а затем подумайте о параллелизации решения, используя конструкции более высокого уровня, такие как 'multiprocessing.Pool '. – Apalala

+1

при использовании конструкции 'with open', вам не нужно явно закрывать файл. Это преимущество оператора с заявлением – Aaron

+0

Вы буквально означаете, что программа зависает бесконечно с 80 строками данных, но не 70? – martineau

ответ

1

данных Похоже, что-то о том, что делает ваш код вызывает затор. Во время экспериментов я заметил, что 3 из 10 задач никогда не прекратятся, но, честно говоря, я действительно не знаю причины (причин) почему.

Хорошая новость заключается в том, что это легко исправить, просто удалив или отключив петлю

# Exit the completed processes 
for p in processes: 
    p.join() 

вы имеете в своем коде.

Вот полная версия коды с (в основном) только этим изменением в нем:

from multiprocessing import Process, Queue 

def openFile(file_name, k, division): 
    i = 0 
    dataSet = [] 
    with open(file_name) as f: 
     for line in f: 
      stripLine = line.strip('\n') 
      splitLine = stripLine.split(division) 
      dataSet += [splitLine] 
      i += 1 
      if i == k: 
       break 

    return dataSet 

def setCombination(q, data1, data2): 
    newData = [] 
    for i in range(len(data1)): 
     for j in range(len(data2)): 
      if data1[i][1] == data2[j][3]: 
       newData += data2[j] 
    q.put(newData) 

if __name__ == '__main__': 
    # Takes in the file, the length of the data to read in, and how the data is divided. 
    data = openFile('testing.txt', 80, ' ') 

    for i in range(len(data)): 
     for j in range(len(data[i])): 
      try: 
       data[i][j] = float(data[i][j]) 
      except ValueError: 
       pass 

    k = len(data) // 10 
    q = Queue() 
    processes = [Process(target=setCombination, args=(q, data[k*x: k*x+k], data)) 
        for x in range(10)] 
    for p in processes: 
     p.start() 

# NO LONGER USED (HANGS) 
# # Exit the completed processes 
# for p in processes: 
#  p.join() 

    # note: this works since by default, get() will block until it can retrieve something 
    saleSet = [q.get() for _ in processes] # a queue item should be added by each Process 
    print('\n', saleSet) 
+0

Можете ли вы объяснить, как вы смогли определить, как вы обнаружили, что 3 из 10 задач никогда не прекратятся? И есть ли способ запустить все задачи одновременно, а не цикл for, но достичь того же результата. Знал бы, что мне нужно изменить, если бы я сравнивал строки? – Eric

+0

Я идентифицировал их, заменив 'in p в процессах:', 'p.join()' цикл в вашем коде с тем, который напечатал строку '0' и '1', указывающую, какой из 10 процессов был ' p.is_alive() 'снова и снова, пока результаты не будут равны нулю. Это быстро стало бы «0000000111», но последние три задачи никогда не прекратятся, так что цикл никогда не заканчивается. Я не знаю, как начать их все сразу. но не думайте, что это что-то изменит. Функция 'setCombination()' будет сравнивать любой тип (ы) данных в списках 'data1' и' data2', переданных ему в качестве аргументов. – martineau

+0

Последующая мысль: см. Мою последнюю редакцию, которая заставляет ее работать, не переключаясь на использование 'JoinableQueue'. – martineau