У меня есть поток, который порождает несколько потребительских процессов, которые выполняют некоторую тяжелую обработку в больших xml-файлах.проблемы с threading с использованием lxml.etree.iterparse
Мой проект для этого заключался в использовании простого одиночного потока для синтаксического анализа входящего потока на лету и создания новых объектов в класс multiprocessing.queues.queue, содержащийся в диспетчере процессов буфера. Менеджер процессов периодически проверяет размер очереди, и если потребление позволяет слишком быстро заполнить очередь, он запускает другого потребителя.
Моя проблема заключается в том, что код для присоединения к закрытой очереди при завершении обработки потока выполняется до того, как xml закончит разбираться !? Мне не кажется, что должен работать следующий код. Имейте в виду, что следующий код полностью однопоточный. Он не является ни назвать, ни использоваться любым кодом SMP:
clear_ok = False
context = lxml.etree.iterparse(response, events=('end',))
for event, elem in context:
# Use QName to avoid specifying or stripping the namespace, which we don't need
if lxml.etree.QName(elem.tag).localname.upper() in obj_elem_map:
import_buffer.add(obj_elem_map[lxml.etree.QName(elem.tag).localname.upper()](elem=elem))
clear_ok = True
if clear_ok:
elem.clear() #don't fill up a dom we don't need.
clear_ok = False
results = import_buffer.finish() if block else import_buffer
когда import_buffer.finish()
называется происходит следующее:
def finish(self):
'''
Notifies the buffer that we are done filling it.
This command binds to any processes still running and lets them
finish and then copies and flushes the managed results list.
'''
# close the queue and wait until it is consumed
self.queue.close()
self.queue.join_thread()
# make sure the consumers are done consuming the queue
for csmr in self.running:
csmr.join()
# turn this into a list instead of a managed list
result = list(self.results_list)
del self.results_list[:]
if self.callback:
return self.callback(result)
else:
return result
Однако я получаю исключение, что близко() была вызвана на queue до Я закончил синтаксический анализ?
Traceback (most recent call last):
File "./tests/test_smp_framework.py", line 103, in test_kqb_parser_fromfile
qkbobs = actions.queryQKB(file=fname)
File "/Users/skyleach/src/smpparser/smpparser/api_actions.py", line 339, in queryQKB
result = self.parseResponse(source=sourcefile)File "/Users/skyleach/src/smpparser/smpparser/smpapi.py", line 535, in parseResponse
import_buffer.add(obj_elem_map[lxml.etree.QName(elem.tag).localname.upper()](elem=elem))
File "/Users/skyleach/src/smpparser/smpparser/smpapi.py", line 212, in add
self.queue.put(item)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/queues.py", line 81, in put
assert not self._closed
AssertionError