2009-07-15 6 views
34

У меня есть многопоточная программа, где я создаю функцию генератора, а затем передаю ее новым потокам. Я хочу, чтобы он был общим/глобальным по природе, поэтому каждый поток может получить следующее значение от генератора.Являются ли генераторы Threadsafe?

Можно ли использовать генератор, как это, или я столкнулся с проблемами/условиями доступа к совместно используемому генератору из нескольких потоков?

Если нет, есть ли лучший способ подойти к проблеме? Мне нужно что-то, что будет циклически перебираться по списку и вызывать следующее значение для того, что нить вызывает его.

ответ

49

Это не поточно-безопасный; одновременные вызовы могут чередовать и помешать локальным переменным.

Общий подход заключается в использовании шаблона Master-Slave (теперь он называется шаблоном фермера-работника на ПК). Создайте третий поток, который генерирует данные, и добавьте очередь между ведущим и подчиненными устройствами, где ведомые будут читать из очереди, и мастер напишет на него. Стандартный модуль очереди обеспечивает необходимую безопасность потока и организует блокировку ведущего устройства, пока ведомые устройства не будут готовы читать больше данных.

+7

Определенно +1 для Queue.Queue, отличный способ организовать систему потоков, когда это применимо (что в большинстве случаев и определенно для этой задачи). –

-7

Это зависит от того, питона реализации вы используете. В CPython GIL делает все операции над объектами python потокобезопасными, поскольку только один поток может выполнять код в любой момент времени.

http://en.wikipedia.org/wiki/Global_Interpreter_Lock

+1

«GIL делает все операции над объектами python threadafe» - да? все операции не являются атомарными –

+6

Это опасно вводит в заблуждение. GIL означает, что код Python не повредит состояние Python в многопоточной среде: вы не можете изменять потоки в середине операции байт-кода. (Например, вы можете изменить общий файл без его развращения.) Вы все равно можете изменять потоки между любыми двумя операциями байт-кода. –

40

Edited добавить планку ниже.

Вы можете обернуть генератор замком. Например,

import threading 
class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

gen = [x*2 for x in [1,2,3,4]] 
g2 = LockedIterator(gen) 
print list(g2) 

Замок принимает 50ms на моей системе, Очередь занимает 350 мс. Очередь полезна, когда у вас действительно есть очередь; например, если у вас есть входящие HTTP-запросы, и вы хотите поставить их в очередь для обработки рабочими потоками. (Это не соответствует модели итератора Python - как только итератор заканчивается, это делается.) Если у вас действительно есть итератор, LockedIterator - это более быстрый и простой способ сделать его потокобезопасным.

from datetime import datetime 
import threading 
num_worker_threads = 4 

class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

def test_locked(it): 
    it = LockedIterator(it) 
    def worker(): 
     try: 
      for i in it: 
       pass 
     except Exception, e: 
      print e 
      raise 

    threads = [] 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     threads.append(t) 
     t.start() 

    for t in threads: 
     t.join() 

def test_queue(it): 
    from Queue import Queue 
    def worker(): 
     try: 
      while True: 
       item = q.get() 
       q.task_done() 
     except Exception, e: 
      print e 
      raise 

    q = Queue() 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     t.setDaemon(True) 
     t.start() 

    t1 = datetime.now() 

    for item in it: 
     q.put(item) 

    q.join() 

start_time = datetime.now() 
it = [x*2 for x in range(1,10000)] 

test_locked(it) 
#test_queue(it) 
end_time = datetime.now() 
took = end_time-start_time 
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000) 
+1

Менее эффективно, используя Queue.Queue, но красиво сделано. – gooli