2016-01-19 1 views
0

У меня есть поток, который порождает несколько потребительских процессов, которые выполняют некоторую тяжелую обработку в больших xml-файлах.проблемы с threading с использованием lxml.etree.iterparse

Мой проект для этого заключался в использовании простого одиночного потока для синтаксического анализа входящего потока на лету и создания новых объектов в класс multiprocessing.queues.queue, содержащийся в диспетчере процессов буфера. Менеджер процессов периодически проверяет размер очереди, и если потребление позволяет слишком быстро заполнить очередь, он запускает другого потребителя.

Моя проблема заключается в том, что код для присоединения к закрытой очереди при завершении обработки потока выполняется до того, как xml закончит разбираться !? Мне не кажется, что должен работать следующий код. Имейте в виду, что следующий код полностью однопоточный. Он не является ни назвать, ни использоваться любым кодом SMP:

clear_ok = False 
context = lxml.etree.iterparse(response, events=('end',)) 
for event, elem in context: 
    # Use QName to avoid specifying or stripping the namespace, which we don't need 
    if lxml.etree.QName(elem.tag).localname.upper() in obj_elem_map: 
     import_buffer.add(obj_elem_map[lxml.etree.QName(elem.tag).localname.upper()](elem=elem)) 
     clear_ok = True 
    if clear_ok: 
     elem.clear() #don't fill up a dom we don't need. 
     clear_ok = False 
results = import_buffer.finish() if block else import_buffer 

когда import_buffer.finish() называется происходит следующее:

def finish(self): 
    ''' 
    Notifies the buffer that we are done filling it. 
    This command binds to any processes still running and lets them 
    finish and then copies and flushes the managed results list. 
    ''' 
    # close the queue and wait until it is consumed 
    self.queue.close() 
    self.queue.join_thread() 
    # make sure the consumers are done consuming the queue 
    for csmr in self.running: 
     csmr.join() 
    # turn this into a list instead of a managed list 
    result = list(self.results_list) 
    del self.results_list[:] 
    if self.callback: 
     return self.callback(result) 
    else: 
     return result 

Однако я получаю исключение, что близко() была вызвана на queue до Я закончил синтаксический анализ?

Traceback (most recent call last): 
    File "./tests/test_smp_framework.py", line 103, in test_kqb_parser_fromfile 
    qkbobs = actions.queryQKB(file=fname) 
    File "/Users/skyleach/src/smpparser/smpparser/api_actions.py", line 339, in queryQKB 
    result = self.parseResponse(source=sourcefile)File "/Users/skyleach/src/smpparser/smpparser/smpapi.py", line 535, in parseResponse 
    import_buffer.add(obj_elem_map[lxml.etree.QName(elem.tag).localname.upper()](elem=elem)) 
    File "/Users/skyleach/src/smpparser/smpparser/smpapi.py", line 212, in add 
    self.queue.put(item) 
    File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/queues.py", line 81, in put 
    assert not self._closed 
AssertionError 

ответ

0

Это не было ошибкой lxml или ошибкой многопроцессорности, это была проблема инициализации назначения контекста. В основном я закодировал слишком быстро и сделал тупую ошибку.

В определении класса буфера я настраивал очередь, а не в функции init. Это означает, что все очереди для всех экземпляров класса были заданы в очереди, когда модуль был импортирован потоком импорта.

Это то, что я получаю для написания кода слишком быстро.

 Смежные вопросы

  • Нет связанных вопросов^_^