2015-03-17 4 views
2

У меня есть несколько файлов, отображаемых в память (как mmap объектов). В процессе их обработки каждый файл должен быть открыт несколько раз. Он работает нормально, если есть только один поток. Однако, когда я пытаюсь запустить задачу параллельно, возникает проблема: разные потоки не могут одновременно обращаться к одному и тому же файлу. Проблема иллюстрируется этим образцом:Множественный доступ к объектам mmap в python

import mmap, threading 

class MmapReading(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 
    def run(self): 
     for i in range(10000): 
      content = mmap_object.read().decode('utf-8') 
      mmap_object.seek(0) 
      if not content: 
       print('Error while reading mmap object') 

with open('my_dummy_file.txt', 'w') as f: 
    f.write('Hello world') 
with open('my_dummy_file.txt', 'r') as f: 
    mmap_object = mmap.mmap(f.fileno(), 0, prot = mmap.PROT_READ) 

threads = [] 
for i in range(64): 
    threads.append(MmapReading()) 
    threads[i].daemon = True 
    threads[i].start() 
for thread in threading.enumerate(): 
    if thread != threading.current_thread(): 
     thread.join() 

print('Mmap reading testing done!') 

Когда я запускаю этот скрипт, я получаю около 20 сообщений об ошибках.

Есть ли способ обойти эту проблему, кроме того, чтобы сделать 64 копии каждого файла (что бы потреблять слишком много памяти в моем случае)?

ответ

1

seek(0) не всегда выполняется перед тем, как другой поток вскочит и выполняет команду read().

  1. Сказать нить 1 выполняет чтение, чтение до конца файла; seek(0) имеет еще не было выполнено.
  2. Затем поток 2 выполняет чтение. Указатель файла в mmap по-прежнему в конце файла. read() поэтому возврат ''.
  3. Код ошибки обнаружения запускается, потому что content - ''.

Вместо использования read() вы можете использовать нарезку для достижения того же результата. Заменить:

content = mmap_object.read().decode('utf-8') 
    mmap_object.seek(0) 

с

content = mmap_object[:].decode('utf8') 

content = mmap_object[:mmap_object.size()] также работает.

Блокировка - это еще один способ, но в этом случае нет необходимости. Если вы хотите попробовать, вы можете использовать глобальный объект threading.Lock и передать его MmapReading при создании экземпляра. Храните объект блокировки в переменной экземпляра self.lock. Затем звоните self.lock.acquire() перед чтением/поиском и затем self.lock.release(). Вы получите очень заметное снижение производительности.

from threading import Lock 

class MmapReading(threading.Thread): 
    def __init__(self, lock): 
     self.lock = lock 
     threading.Thread.__init__(self) 

    def run(self): 
     for i in range(10000): 
      self.lock.acquire() 
      mmap_object.seek(0) 
      content = mmap_object.read().decode('utf-8') 
      self.lock.release() 
      if not content: 
       print('Error while reading mmap object') 

lock = Lock() 
for i in range(64): 
    threads.append(MmapReading(lock)) 
. 
. 
. 

Обратите внимание, что я изменил порядок чтения и поиска; вначале имеет смысл делать поиск, позиционируя указатель файла в начале файла.

+0

Я понимаю, что происходит. Вопрос в том, как его избежать? – Roman

+0

@Roman: уже ответил. – mhawke

1

Я не вижу, где вам нужно mmap для начала. mmap - метод обмена данными между процессами. Почему бы вам просто не прочитать содержимое в памяти (один раз!), Например. как список? Каждый поток будет получать доступ к списку с собственным набором итераторов. Кроме того, имейте в виду GIL в Python, который предотвращает любое ускорение при использовании многопоточности. Если вы хотите, что использование многопроцессорная (и затем mmaped файл имеет смысл, но на самом деле распределены между различными процессами)

+0

mmap также имеет некоторые другие преимущества, помимо совместного использования между процессами, например. эффективный случайный доступ к большому файлу, где ОС управляет поисковым вызовом. – mhawke

+0

Спасибо, что упомянул GIL. Однако из-за внешнего вызова приложения в моих реальных потоках я получаю необходимое ускорение. – Roman

+0

@mhawke Ах, интересно. – deets

0

Вопрос заключается в том, что единственный mmap_object в настоящее время распределяется между нитями так, что нить звонит чтения и до того, как он попадет в поиск, поток B также вызывает чтение, и поэтому не получает данных.

Что вам действительно нужно - это возможность дублировать объект mmap python без дублирования лежащего в основе mmap, но я не вижу этого.

Я думаю, что единственным возможным решением, не связанным с переписыванием реализации объекта, является использование блокировки (мьютекс и т. Д.) На объект mmap для предотвращения одновременного доступа двух потоков к одному и тому же объекту.