У меня есть скрипт, который анализирует файлы xml с помощью ElementTree Path Evaluator. Он отлично работает, но для этого требуется много времени. Так что я попытался сделать многопоточную реализацию:Почему мой многопоточный парсер не многопоточен?
import fnmatch
import operator
import os
import lxml.etree
from nltk import FreqDist
from nltk.corpus import stopwords
from collections import defaultdict
from datetime import datetime
import threading
import Queue
STOPWORDS = stopwords.words('dutch')
STOPWORDS.extend(stopwords.words('english'))
DIR_NAME = 'A_DIRNAME'
PATTERN = '*.A_PATTERN'
def loadData(dir_name, pattern):
nohyphen_files = []
dir_names = []
dir_paths = []
for root, dirnames, filenames in os.walk(dir_name):
dir_names.append(dirnames)
dir_paths.append(root)
for filename in fnmatch.filter(filenames, pattern):
nohyphen_files.append(os.path.join(root, filename))
return nohyphen_files, dir_names, dir_paths
def freq(element_list, descending = True):
agglomerated = defaultdict(int)
for e in element_list:
agglomerated[e] += 1
return sorted(agglomerated.items(), key=operator.itemgetter(1), reverse=descending)
def lexDiv(amount_words):
return 1.0*len(set(amount_words))/len(amount_words)
def anotherFreq(list_types, list_words):
fd = FreqDist(list_types)
print 'top 10 most frequent types:'
for t, freq in fd.items()[:10]:
print t, freq
print '\ntop 10 most frequent words:'
agglomerated = defaultdict(int)
for w in list_words:
if not w.lower() in STOPWORDS:
agglomerated[w] += 1
sorted_dict = sorted(agglomerated.items(), key=operator.itemgetter(1),reverse=True)
print sorted_dict[:10]
def extractor(f):
print "check file: {}".format(f)
try:
# doc = lxml.etree.ElementTree(lxml.etree.XML(f))
doc = lxml.etree.ElementTree(file=f)
except lxml.etree.XMLSyntaxError, e:
print e
return
doc_evaluator = lxml.etree.XPathEvaluator(doc)
entities = doc_evaluator('//entity/*/externalRef/@reference')
places_dbpedia = doc_evaluator('//entity[contains(@type, "Schema:Place")]/*/externalRef/@reference')
non_people_dbpedia = set(doc_evaluator('//entity[not(contains(@type, "Schema:Person"))]'))
people = doc_evaluator('//entity[contains(@type, "Schema:Person")]/*/externalRef/@reference')
words = doc.xpath('text/wf[re:match(text(), "[A-Za-z-]")]/text()',\
namespaces={"re": "http://exslt.org/regular-expressions"})
unique_words = set(words)
other_tokens = doc.xpath('text/wf[re:match(text(), "[^A-Za-z-]")]/text()',\
namespaces={"re": "http://exslt.org/regular-expressions"})
amount_of_sentences = doc_evaluator('text/wf/@sent')[-1]
types = doc_evaluator('//term/@morphofeat')
longest_sentence = freq(doc.xpath('text/wf[re:match(text(), "[A-Za-z-]")]/@sent',\
namespaces={"re": "http://exslt.org/regular-expressions"}))[0]
top_people = freq([e.split('/')[-1] for e in people])[:10]
top_entities = freq([e.split('/')[-1] for e in entities])[:10]
top_places = freq([e.split('/')[-1] for e in places_dbpedia])[:10]
def worker():
while 1:
job_number = q.get()
extractor(job_number)
q.task_done() #this thread is complete, move on
if __name__ =='__main__':
startTime = datetime.now()
files, dirs, path = loadData(DIR_NAME, PATTERN)
startTime = datetime.now()
q = Queue.Queue()# job queue
for f in files:
q.put(f)
for i in range(20): #make 20 workerthreads ready
worker_thread = threading.Thread(target=worker)
worker_thread.daemon = True
worker_thread.start()
q.join()
print datetime.now() - startTime
Это делает что-то, но когда его синхронизации, не быстрее, чем обычная версия. Я думаю, что это имеет какое-то отношение к открытию и чтению файлов, делающих threader не многопоточным. Если я использую функцию, которая вместо синтаксического анализа xml-файла просто спит на пару секунд и что-то печатает, он работает, и это происходит намного быстрее. Что мне нужно для учета многопоточного XML-парсера?
с помощью 'threading' только делает параллелизации коды вы написали. На самом деле это не переводит ядра в CPU (исправьте меня, если я ошибаюсь здесь). А также чтение с одного диска будет бутылочной горловиной, так как сам диск может обрабатывать столько операций ввода-вывода за один раз. Вы можете получить несколько секунд даже минут в лучшем случае от parellizing (?!?) Вашего кода. Что вам нужно, это лучший объем хранилища и, вероятно, механизм кэширования для более быстрого чтения. Попробуйте сначала прочитать файлы в ОЗУ или БД в качестве кеша, а затем работать с ними. Или RAID ваши диски. – Torxed
@Torxed это не моя цель, чтобы он переходил через ядра в CPU. Параллелизация - это моя цель, точка, которую я пытаюсь сделать, заключается в том, что запуск функции, выполняющей какой-либо разбор, делает всю блокировку программирования. Предположим, что функция extractor просто спит на секунду и печатает что-то, чем она работает, но при анализе xml-файла она не работает параллельно. Знаете ли вы, связано ли это с XPathEvaluator и если есть обходной путь? –