2013-09-12 2 views
1

Я пытаюсь отправить информацию, извлеченную из строк большого файла, в процесс, запущенный на каком-то сервере.Как генерировать будущее, только если свободный рабочий доступен

Чтобы ускорить это, я хотел бы сделать это с некоторыми потоками параллельно.

Использование Питон 2,7 портировать из concurrent.futures Я попытался это:

f = open("big_file") 
with ThreadPoolExecutor(max_workers=4) as e: 
    for line in f: 
     e.submit(send_line_function, line) 
f.close() 

Однако это проблематично, так как все фьючерсы получить представленное мгновенно, так что моя машина работает из памяти, потому что весь файл загружается в память.

Мой вопрос в том, что есть простой способ представить новое будущее, когда свободный рабочий доступен.

+0

Это может быть быстрее, чтобы использовать функцию [ 'os.sendfile'] (http://docs.python.org/3.3/library/os.html#os.sendfile) для отправки передачи файлов через сокет. – Bakuriu

+0

Возможно, я должен был упомянуть, что этот пример немного упрощен ... на самом деле я отправляю информацию, извлеченную из каждой строки файла, в API JSON-REST. – tobigue

ответ

1

Вы можете перебирать куски файла, используя

for chunk in zip(*[f]*chunksize): 

(Это применение grouper recipe, который собирает элементы из итератора f в группы по размеру chunksize. Примечание: Это не потребляет цельные файл сразу, так как zip возвращает итератор в Python3.)


import concurrent.futures as CF 
import itertools as IT 
import logging 

logger = logging.getLogger(__name__) 
logging.basicConfig(level=logging.DEBUG, 
        format='[%(asctime)s %(threadName)s] %(message)s', 
        datefmt='%H:%M:%S') 

def worker(line): 
    line = line.strip() 
    logger.info(line) 

chunksize = 1024 
with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f: 
    for chunk in zip(*[f]*chunksize): 
     futures = [executor.submit(worker, line) for line in chunk] 
     # wait for these futures to complete before processing another chunk 
     CF.wait(futures) 

Теперь в комментариях вы справедливо указываете, что это не оптимально. Там может быть какой-то рабочий, который занимает много времени и держит целую кучу заданий.

Обычно, если каждый вызов работнику занимает примерно такое же количество времени, это не имеет большого значения. Тем не менее, вот способ продвижения файловой дескрипции по требованию. Он использует threading.Condition для уведомления sprinkler для продвижения дескриптора файла.

import logging 
import threading 
import Queue 

logger = logging.getLogger(__name__) 
logging.basicConfig(level=logging.DEBUG, 
        format='[%(asctime)s %(threadName)s] %(message)s', 
        datefmt='%H:%M:%S') 
SENTINEL = object() 

def worker(cond, queue): 
    for line in iter(queue.get, SENTINEL): 
     line = line.strip() 
     logger.info(line) 
     with cond: 
      cond.notify() 
      logger.info('notify') 

def sprinkler(cond, queue, num_workers): 
    with open("big_file") as f: 
     for line in f: 
      logger.info('advancing filehandle') 
      with cond: 
       queue.put(line) 
       logger.info('waiting') 
       cond.wait() 
     for _ in range(num_workers): 
      queue.put(SENTINEL) 

num_workers = 4 
cond = threading.Condition() 
queue = Queue.Queue() 
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers]) 
t.start() 

threads = [threading.Thread(target=worker, args=[cond, queue])] 
for t in threads: 
    t.start() 
for t in threads: 
    t.join() 
+0

Мне нравится, когда это происходит, но оно страдает от исходной проблемы. Основной поток по-прежнему пытается поставить в очередь файл entier. Как насчет одиночной очереди возврата, где каждый поток отправляет сообщение «done». Основной поток мог бы поставить, скажем, 16 элементов в очереди, а затем добавлять только элементы, поскольку он получает «сделанные» сообщения. – tdelaney

+0

Хорошо, в этом случае вы можете обработать файл в кусках, используя 'for chunk in IT.izip (* [f] * chunksize)'. – unutbu

+0

Спасибо за предложение! Я еще не смог проверить это, но это может ускорить меня для меня. Однако это не оптимальное решение, поскольку, насколько я вижу, все фьючерсы в куске должны заканчиваться до того, как будет отправлен новый кусок (что может привести к простоям работников). Также обратите внимание, что я использую Python 2.7 и backport of concurrent.futures, поэтому в этом случае, вероятно, нужно использовать 'izip'. – tobigue

 Смежные вопросы

  • Нет связанных вопросов^_^