Вы можете перебирать куски файла, используя
for chunk in zip(*[f]*chunksize):
(Это применение grouper recipe, который собирает элементы из итератора f
в группы по размеру chunksize
. Примечание: Это не потребляет цельные файл сразу, так как zip
возвращает итератор в Python3.)
import concurrent.futures as CF
import itertools as IT
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
def worker(line):
line = line.strip()
logger.info(line)
chunksize = 1024
with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f:
for chunk in zip(*[f]*chunksize):
futures = [executor.submit(worker, line) for line in chunk]
# wait for these futures to complete before processing another chunk
CF.wait(futures)
Теперь в комментариях вы справедливо указываете, что это не оптимально. Там может быть какой-то рабочий, который занимает много времени и держит целую кучу заданий.
Обычно, если каждый вызов работнику занимает примерно такое же количество времени, это не имеет большого значения. Тем не менее, вот способ продвижения файловой дескрипции по требованию. Он использует threading.Condition
для уведомления sprinkler
для продвижения дескриптора файла.
import logging
import threading
import Queue
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
SENTINEL = object()
def worker(cond, queue):
for line in iter(queue.get, SENTINEL):
line = line.strip()
logger.info(line)
with cond:
cond.notify()
logger.info('notify')
def sprinkler(cond, queue, num_workers):
with open("big_file") as f:
for line in f:
logger.info('advancing filehandle')
with cond:
queue.put(line)
logger.info('waiting')
cond.wait()
for _ in range(num_workers):
queue.put(SENTINEL)
num_workers = 4
cond = threading.Condition()
queue = Queue.Queue()
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers])
t.start()
threads = [threading.Thread(target=worker, args=[cond, queue])]
for t in threads:
t.start()
for t in threads:
t.join()
Это может быть быстрее, чтобы использовать функцию [ 'os.sendfile'] (http://docs.python.org/3.3/library/os.html#os.sendfile) для отправки передачи файлов через сокет. – Bakuriu
Возможно, я должен был упомянуть, что этот пример немного упрощен ... на самом деле я отправляю информацию, извлеченную из каждой строки файла, в API JSON-REST. – tobigue