2015-07-22 1 views
1

Я работаю над резьбовым приложением, где один поток будет подавать Queue с объектами, подлежащими изменению, и затем ряд других потоков будет считываться из очереди, вносить изменения и сохранять изменения.Peewee, SQLite и threading

Приложение не нуждается в большом количестве параллелизма, поэтому я хотел бы придерживаться базы данных SQLite. Вот небольшой пример, иллюстрирующий применение:

import queue 
import threading 
import peewee as pw 

db = pw.SqliteDatabase('test.db', threadlocals=True) 

class Container(pw.Model): 
    contents = pw.CharField(default="spam") 

    class Meta: 
     database = db 


class FeederThread(threading.Thread): 

    def __init__(self, input_queue): 
     super().__init__() 

     self.q = input_queue 

    def run(self): 
     containers = Container.select() 

     for container in containers: 
      self.q.put(container) 


class ReaderThread(threading.Thread): 

    def __init__(self, input_queue): 
     super().__init__() 

     self.q = input_queue 

    def run(self): 
     while True: 
      item = self.q.get() 

      with db.execution_context() as ctx: 
       # Get a new connection to the container object: 
       container = Container.get(id=item.id) 
       container.contents = "eggs" 
       container.save() 

      self.q.task_done() 


if __name__ == "__main__": 

    db.connect() 
    try: 
     db.create_tables([Container,]) 
    except pw.OperationalError: 
     pass 
    else: 
     [Container.create() for c in range(42)] 
    db.close() 

    q = queue.Queue(maxsize=10) 


    feeder = FeederThread(q) 
    feeder.setDaemon(True) 
    feeder.start() 

    for i in range(10): 
     reader = ReaderThread(q) 
     reader.setDaemon(True) 
     reader.start() 

    q.join() 

На основе Peewee документы многопоточность должны быть поддержаны для SQLite. Тем не менее, я продолжаю получать печально известную ошибку peewee.OperationalError: database is locked с выходом ошибки, указывающим на линию container.save().

Как мне обойти это?

ответ

3

Я был удивлен, увидев этот провал, поэтому я скопировал ваш код и поиграл с различными идеями. Я думаю, что проблема в том, что ExecutionContext() по умолчанию приведет к тому, что завершенный блок будет запущен в транзакции. Чтобы этого избежать, я прошел в False в чтениях.

Я также отредактировал фидер, чтобы использовать инструкцию SELECT перед тем, как положить материал в очередь (list(Container.select())).

Следующие работы для меня локально:

class FeederThread(threading.Thread): 

    def __init__(self, input_queue): 
     super(FeederThread, self).__init__() 

     self.q = input_queue 

    def run(self): 
     containers = list(Container.select()) 

     for container in containers: 
      self.q.put(container.id) # I don't like passing model instances around like this, personal preference though 

class ReaderThread(threading.Thread): 

    def __init__(self, input_queue): 
     super(ReaderThread, self).__init__() 

     self.q = input_queue 

    def run(self): 
     while True: 
      item = self.q.get() 

      with db.execution_context(False): 
       # Get a new connection to the container object: 
       container = Container.get(id=item) 
       container.contents = "nuggets" 
       with db.atomic(): 
        container.save() 

      self.q.task_done() 

if __name__ == "__main__": 

    with db.execution_context(): 
     try: 
      db.create_tables([Container,]) 
     except OperationalError: 
      pass 
     else: 
      [Container.create() for c in range(42)] 

    # ... same ... 

Я не вполне доволен этим, но мы надеемся, что это дает вам некоторые идеи.

Вот блоге я написал некоторое время назад, что есть несколько советов для получения более высокой параллельности с SQLite: http://charlesleifer.com/blog/sqlite-small-fast-reliable-choose-any-three-/

+0

Спасибо за ваш ответ! Threading по-прежнему немного вуду для меня, поэтому я рад, что это была не очевидная ошибка с моей стороны. Интересно, что употребление инструкции SELECT, по-видимому, является ключевым здесь - я не вижу никакой разницы, используя 'db.execution_context (False)' или 'с db.atomic()'. Фактически, употребляя инструкцию SELECT, мне даже не кажется, что нужно использовать «ExecutionContext()». Итак, я полагаю, что инструкция SELECT фактически блокировала базу данных? – digitaldingo

0

Вы пробовали режим WAL?

Improve INSERT-per-second performance of SQLite?

Вы должны быть очень осторожны, если у вас есть одновременный доступ к SQLite, так как вся база данных блокируется при записи сделаны, и хотя несколько читателей возможно, пишет будут заблокированы. Это несколько улучшилось с добавлением WAL в новых версиях SQLite.

и

Если вы используете несколько потоков, вы можете попробовать использовать общий кэш страниц, который позволит загруженные страницам быть разделен между потоками, которые могут избежать дорогостоящих вызовов ввода/вывода.

+0

Спасибо за ваши предложения! К сожалению, ни WAL, ни общий доступ кэш-памяти, похоже, здесь не меняются. – digitaldingo