23

У меня есть реактор, который извлекает сообщения из RabbitMQ брокера и запускает методы работника для обработки этих сообщений в пуле процессов, что-то вроде этого:Как обрабатывать соединения SQLAlchemy в ProcessPool?

Reactor

Это реализуется с помощью питона asyncio, loop.run_in_executor() и concurrent.futures.ProcessPoolExecutor.

Теперь я хочу получить доступ к базе данных в рабочих методах с помощью SQLAlchemy. В основном обработка будет очень простой и быстрой операцией CRUD.

Реактор будет обрабатывать 10-50 сообщений в секунду в начале, поэтому недопустимо открывать новое соединение с базой данных для каждого запроса. Скорее я хотел бы поддерживать одно постоянное соединение для каждого процесса.

Мои вопросы: Как я могу это сделать? Могу ли я просто хранить их в глобальной переменной? Будет ли пул соединений SQA обработать это для меня? Как очистить, когда останавливается реактор?

[Update]

  • База данных MySQL с InnoDB.

Почему именно этот шаблон с пулом процессов?

В текущей реализации используется другой шаблон, в котором каждый потребитель работает в своем потоке. Так или иначе, это не очень хорошо. Есть уже около 200 потребителей, каждый из которых работает в своем потоке, и система быстро растет. Чтобы лучше масштабироваться, идея заключалась в том, чтобы разделить проблемы и потреблять сообщения в цикле ввода-вывода и делегировать обработку пулу. Конечно, производительность всей системы в основном связана с вводом-выводом. Однако CPU обрабатывает большие результирующие наборы.

Другой причиной было «простота использования». Хотя обработка соединений и потребление сообщений реализованы асинхронно, код рабочего может быть синхронным и простым.

Вскоре стало очевидным, что доступ к удаленным системам через постоянные сетевые подключения изнутри работника является проблемой. Это то, что предназначены для CommunicationChannels: внутри рабочего я могу передавать запросы на шину сообщений через эти каналы.

Одна из моих нынешних идей - обращаться с доступом к БД аналогичным образом: передавать утверждения через очередь к циклу событий, куда они отправляются в БД. Однако я не знаю, как это сделать с помощью SQLAlchemy. Где была бы точка входа? Объекты должны быть pickled, когда они проходят через очередь. Как получить такой объект из запроса SQA? Связь с базой данных должна работать асинхронно, чтобы не блокировать цикл события. Могу ли я использовать, например. aiomysql как драйвер базы данных для SQA?

+0

Итак, каждый рабочий - это собственный процесс? Невозможно совместно использовать соединения, поэтому, возможно, вам следует создать экземпляр каждого (локального) SQA-пула с максимальным лимитом 1 или 2 соединения. Затем наблюдайте, может быть, через базу данных (которая db?), Какие соединения порождаются/убиваются. Из-за плохого горения на этом - то, что вы ** не хотите делать, это реализовать свой собственный наивный пул на вершине SQA.Или попробуйте определить, закрыто ли соединение SQA или нет. –

+0

@JLPeyret: Я обновил вопрос с запрошенной вами информацией. И нет ... Я не планирую реализовать свой собственный пул соединений. – roman

+0

Итак, я думаю, что я помню, что соединения не могут пересекать процессы (в ОС смысл слова, чтобы отличать от потоков). И я знаю, что соединения вообще не соленые. Вы должны иметь возможность сообщать «мертвые» (строковые) sql-операторы, но я считаю, что вам будет непросто проходить вокруг db-подключений, я думаю, включая, вероятно, результаты SQA. Спекуляция на моем конце, но с некоторой степенью игры с нечетным использованием SQA, чтобы оправдать это. –

ответ

6

Ваше требование соединения одной базы данных в процессе процесса пула может быть легко выполнено, если кто-то позаботится о том, как вы экземпляр session, если вы работаете с ОРМ, в рабочих процессах.

Простое решение будет иметь глобальный session, который вы повторно использовать по запросам:

# db.py 
engine = create_engine("connection_uri", pool_size=1, max_overflow=0) 
DBSession = scoped_session(sessionmaker(bind=engine)) 

А на рабочем задачи:

# task.py 
from db import engine, DBSession 
def task(): 
    DBSession.begin() # each task will get its own transaction over the global connection 
    ... 
    DBSession.query(...) 
    ... 
    DBSession.close() # cleanup on task end 

Аргументы pool_size и max_overflowcustomize по умолчанию QueuePool используется create_engine. pool_size будет убедиться, что ваш процесс поддерживает только 1 соединение в каждом процессе в пуле процессов.

Если вы хотите его восстановить, вы можете использовать DBSession.remove(), который удалит сеанс из реестра и заставит его повторно подключиться к следующему использованию DBSession. Вы также можете использовать аргумент recyclePool, чтобы восстановить соединение после указанного количества времени.

Во время разработки/дебюнга вы можете использовать AssertionPool, который вызовет исключение, если из пула выведено более одного соединения, см. switching pool implementations о том, как это сделать.

+0

Итак, вы, в основном, думаете, что я не должен волноваться, потому что SQA-пул справится с этим прямо из коробки? Это было бы хорошо! Я перенастрою наше основное приложение с +200 потребителями и +20000 строк кода на новую архитектуру программного обеспечения в течение ближайших нескольких дней и посмотрю, работает ли оно. – roman

+0

@roman Удачи вам в вашем рефакторе, если у вас есть какие-либо вопросы, не стесняйтесь оставлять комментарии здесь, и если вы чувствуете, что я закрыл ваш вопрос, было бы неплохо отметить это как принятое :). – olokki

+0

Кажется, все хорошо! :) Этот раздел в документах следует упомянуть. Я думаю, http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html?highlight=multiprocessing#using-connection-pools-with-multiprocessing. Нужно уделять особое внимание многопроцессорности. – roman

0

@roman: Хороший вызов у ​​вас там.

я, находясь в подобной ситуации раньше, так вот мой 2 цента: если только этот потребитель «читать» и «написать» сообщение, не делать какую-либо реальные обрабатывающий этого, вы могли бы re-design этот потребитель в качестве потребителя/производителя, который будет потребляет сообщение, оно обработает сообщение, а затем отправит результат в другую очередь, эту очередь (обработанные сообщения, например) можно было бы прочитать 1..N non асинхронные процессы, которые открывали бы соединение БД в его собственном жизненном цикле.

Я могу расширить свой ответ, но я не знаю, подходит ли этот подход для ваших нужд, если да, я могу дать вам более подробную информацию о расширенном дизайне.

+0

Я рассматривал такой подход, но думаю, будет очень сложно правильно обработать транзакцию. Я думаю, что я не хочу пытаться создать собственный диспетчер распределенных транзакций. – roman

0

Подход, который мне очень понравился, заключается в использовании веб-сервера для обработки и масштабирования пула процессов. flask-sqlalchemy даже в состоянии по умолчанию будет поддерживать пул соединений и не закрывать каждое соединение в каждом цикле ответа запроса.

Исполнитель asyncio может просто вызвать конечные точки url для выполнения ваших функций. Дополнительным преимуществом является то, что, поскольку все процессы, выполняющие работу, находятся за URL-адресом, вы можете тривиально масштабировать свой рабочий пул по нескольким машинам, добавляя больше процессов через пушки или один из других методов, чтобы масштабировать простой сервер wsgi. Кроме того, вы получаете всю отказоустойчивую доброту.

Недостатком является то, что вы можете передавать больше информации по сети. Однако, как вы говорите, проблема связана с процессором, и вы, вероятно, будете передавать гораздо больше данных в базу данных и из нее.

+0

Когда я говорю, что проблема с процессором, я не имею в виду, что основная рабочая нагрузка связана с процессором! Это не так ... Как и в случае с другим подходом выше, я вижу серьезные проблемы с обработкой транзакций здесь. Для того, чтобы иметь сетевое соединение без учета состояния между бизнес-логикой и уровнем персистентности, страшно. – roman

 Смежные вопросы

  • Нет связанных вопросов^_^