2011-01-22 4 views
0

так давайте представим большой XML-документ (размер файла> 100 мб), который мы хотим обработать с помощью cElementTree.iterparse.разделение и захват etree.iterparse с использованием многопроцессорности

но все те ядра, которые обещали нам Intel, были бы полезными, как их использовать? вот что я хочу:

from itertools import islice 
from xml.etree import ElementTree as etree 

tree_iter = etree.iterparse(open("large_file.xml", encoding="utf-8")) 

first = islice(tree_iter, 0, 10000) 
second = islice(tree_iter, 10000) 

parse_first() 
parse_second() 

Там, кажется, несколько проблем с этим, не в последнюю очередь в том, что итератор, возвращаемый iterparse(), кажется, сопротивляться нарезку.

Есть ли способ разделить разбор нагрузки большого XML-документ в двух или четыре отдельных задач (без загрузки всего документа в память? Цель бытия, то для выполнения задач на отдельных процессорах.

ответ

0

I . думаю, что вам нужен хороший ThreadPool с очередью задач для этого я нашел (и использовать) это очень хороший (это в Python3, но не должно быть слишком трудно преобразовать в 2.x):

# http://code.activestate.com/recipes/577187-python-thread-pool/ 

from queue import Queue 
from threading import Thread 

class Worker(Thread): 
    def __init__(self, tasks): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.start() 

    def run(self): 
     while True: 
      func, args, kargs = self.tasks.get() 
      try: func(*args, **kargs) 
      except Exception as exception: print(exception) 
      self.tasks.task_done() 

class ThreadPool: 
    def __init__(self, num_threads): 
     self.tasks = Queue(num_threads) 
     for _ in range(num_threads): Worker(self.tasks) 

    def add_task(self, func, *args, **kargs): 
     self.tasks.put((func, args, kargs)) 

    def wait_completion(self): 
     self.tasks.join() 

Теперь вы можете просто запустить цикл на iterparse и позволить threadpool делить работу на вас. Использование этого простое:

def executetask(arg): 
    print(arg) 

workers = threadpool.ThreadPool(4) # 4 is the number of threads 
for i in range(100): workers.add_task(executetask, i) 

workers.wait_completion() # not needed, only if you need to be certain all work is done before continuing 
+0

так что я предполагаю, что я вызываю work.add_task с функцией, которая анализирует каждый отдельный элемент? для elem в etree.parseiter(): workers.add_task (parseElem, elem)? проблема в том, что, поскольку синтаксический анализ является относительно простым, это не приводит к увеличению производительности. мне нужно разделить etree.parseiter() на управляемые куски: в идеале из 100 000 элементов на итерации дайте 25.000 каждому потоку в пуле. это возможно? –

+0

Это зависит от того, что вы делаете, но я думаю. – orlp