2017-02-11 9 views
0

Я столкнулся с проблемой параллельных вычислений из большого файла csv. Проблема в том, что чтение из файла не может быть параллельно, но куски данных из файла могут передаваться для параллельных вычислений. Я попытался использовать Multiprocessing.Pool без результата (Pool.imap не принимает генератор выходных данных).данные параллельного процесса из файла

У меня есть генератор для чтения кусков данных из файла. Требуется ок. 3 сек. для извлечения одного фрагмента данных из файла. Этот кусок данных обрабатывается, если требуется ca. 2 сек. Я получаю 50 кусков данных из файла. В ожидании следующего фрагмента файла я мог бы вычислить предыдущий фрагмент «параллельно».

Let`s есть некоторый код в концепции (но не работает на практике) .:

def file_data_generator(path): 
    # file reading chunk by chunk 
    yield datachunk 

def compute(datachunk): 
    # some heavy computation 2.sec 
    return partial_result 

from multiprocessing import Pool 
p = Pool() 
result = p.imap(compute, file_data_generator(path)) # yield is the issue? 

Что я делаю неправильно? Любые другие инструменты следует использовать? It`s Python3.5

Простой код Концепция/скелет оценил :)

ответ

2

Вы были очень близки. Бит генератора с yield верен: imapделает принимает генератор в качестве аргумента и запускает next(), поэтому yield верен в этом контексте.

Что вам не хватает, так это то, что imap не блокирует, это значит, что вызов result = p.imap возвращается, хотя процессы еще не закончены. Вы либо должны сделать

p.close() 
p.join() 

А потом что-то делать с results в целом, или вы просто итерацию по результату. Вот рабочий пример:

from multiprocessing import Pool, Queue 

def compute(line): 
    # some heavy computation 2.sec 
    return len(line) 

def file_data_generator(path): 
    # file reading chunk by chunk 
    with open('book.txt') as f: 
     for line in f: 
      yield line.strip() 

if __name__ == '__main__': 
    p = Pool() 
    # start processes, they are still blocked because queue is empty 
    # results is a generator and is empty at the start 
    results = p.imap(compute, file_data_generator('book.txt')) 

    # now we tell pool that we finished filling the queue 
    p.close() 
    for res in results: 
     print(res) 
+0

Helo, спасибо за быстрый ответ, ваш код работает отлично! :). Этот вопрос является частью более серьезной проблемы. Может быть, вы могли бы указать мне направление для начала. Вопрос ниже –

+0

@MaciejskiPawel. Я вижу, что вы удалили свой ответ здесь, но где вы открыли новый вопрос? Я не вижу этого, и у меня уже был ответ :-) – hansaplast