У меня есть реактор, который извлекает сообщения из RabbitMQ брокера и запускает методы работника для обработки этих сообщений в пуле процессов, что-то вроде этого:Как обрабатывать соединения SQLAlchemy в ProcessPool?
Это реализуется с помощью питона asyncio
, loop.run_in_executor()
и concurrent.futures.ProcessPoolExecutor
.
Теперь я хочу получить доступ к базе данных в рабочих методах с помощью SQLAlchemy. В основном обработка будет очень простой и быстрой операцией CRUD.
Реактор будет обрабатывать 10-50 сообщений в секунду в начале, поэтому недопустимо открывать новое соединение с базой данных для каждого запроса. Скорее я хотел бы поддерживать одно постоянное соединение для каждого процесса.
Мои вопросы: Как я могу это сделать? Могу ли я просто хранить их в глобальной переменной? Будет ли пул соединений SQA обработать это для меня? Как очистить, когда останавливается реактор?
[Update]
- База данных MySQL с InnoDB.
Почему именно этот шаблон с пулом процессов?
В текущей реализации используется другой шаблон, в котором каждый потребитель работает в своем потоке. Так или иначе, это не очень хорошо. Есть уже около 200 потребителей, каждый из которых работает в своем потоке, и система быстро растет. Чтобы лучше масштабироваться, идея заключалась в том, чтобы разделить проблемы и потреблять сообщения в цикле ввода-вывода и делегировать обработку пулу. Конечно, производительность всей системы в основном связана с вводом-выводом. Однако CPU обрабатывает большие результирующие наборы.
Другой причиной было «простота использования». Хотя обработка соединений и потребление сообщений реализованы асинхронно, код рабочего может быть синхронным и простым.
Вскоре стало очевидным, что доступ к удаленным системам через постоянные сетевые подключения изнутри работника является проблемой. Это то, что предназначены для CommunicationChannels: внутри рабочего я могу передавать запросы на шину сообщений через эти каналы.
Одна из моих нынешних идей - обращаться с доступом к БД аналогичным образом: передавать утверждения через очередь к циклу событий, куда они отправляются в БД. Однако я не знаю, как это сделать с помощью SQLAlchemy. Где была бы точка входа? Объекты должны быть pickled
, когда они проходят через очередь. Как получить такой объект из запроса SQA? Связь с базой данных должна работать асинхронно, чтобы не блокировать цикл события. Могу ли я использовать, например. aiomysql как драйвер базы данных для SQA?
Итак, каждый рабочий - это собственный процесс? Невозможно совместно использовать соединения, поэтому, возможно, вам следует создать экземпляр каждого (локального) SQA-пула с максимальным лимитом 1 или 2 соединения. Затем наблюдайте, может быть, через базу данных (которая db?), Какие соединения порождаются/убиваются. Из-за плохого горения на этом - то, что вы ** не хотите делать, это реализовать свой собственный наивный пул на вершине SQA.Или попробуйте определить, закрыто ли соединение SQA или нет. –
@JLPeyret: Я обновил вопрос с запрошенной вами информацией. И нет ... Я не планирую реализовать свой собственный пул соединений. – roman
Итак, я думаю, что я помню, что соединения не могут пересекать процессы (в ОС смысл слова, чтобы отличать от потоков). И я знаю, что соединения вообще не соленые. Вы должны иметь возможность сообщать «мертвые» (строковые) sql-операторы, но я считаю, что вам будет непросто проходить вокруг db-подключений, я думаю, включая, вероятно, результаты SQA. Спекуляция на моем конце, но с некоторой степенью игры с нечетным использованием SQA, чтобы оправдать это. –