Я вернулся с вопросом об asyncio. Я считаю, что это очень полезно (особенно из-за GIL с потоками), и я пытаюсь повысить производительность некоторых фрагментов кода.python 3 asyncio и MotorClient: как использовать двигатель с многопоточными и многократными циклами событий
Мое приложение делает следующее:
- 1 Фоновой демон нити «А» получает события от подключенных клиентов и реагирующее на заполнение SetQueue (что просто очередь события, которое удаляет повторяющиеся идентификаторы) и выполнив некоторые вставки в БД. Я получаю этот демон от другого модуля (в основном я управляю обратным вызовом с момента получения события). В моем примере ниже я подменил это потоком, который я генерирую, и это просто просто заполняет очередь 20 элементами и имитирует вставки БД перед выходом.
1 Фоновая демон поток «B» запускается (loop_start), и он просто перебирает работает до завершения сопрограмму, что:
- распаковывает все элементы в очереди (если оно не пусто, в противном случае освободить контроль за е секунды, а затем сопрограммный повторно запущен)
для каждого идентификатора в очереди, запускает прикован сопрограммным что:
Создает и ждет задач, которая просто извлекает все соответствующую информацию для этого идентификатора из БД. Я использую MotorClient, который поддерживает asyncio, чтобы ждать в самой задаче.
Использует исполнитель пула процессов, чтобы запустить процесс на один идентификатор, который использует данные БД для выполнения интенсивной обработки ЦП.
Основной поток просто инициализирует db_client и принимает loop_start и команды останова.
То есть в основном это.
Теперь я стараюсь повысить производительность как можно больше.
Мой текущий вопрос заключается в использовании motor.motor_asyncio.AsyncioMotorClient()
таким образом:
- Он инициализируется в основном потоке, и там я хочу, чтобы создать индексы
- темы «A» должен выполнить DB вставки
- Резьбе «B» необходимо выполнить поиск и считывание БД
Как это сделать? Motor заявляет, что он предназначен для приложения с одним потоком, где вы используете явно один цикл событий. Здесь я оказался вынужденным иметь два цикла событий, один в потоке «А» и один в потоке «В». Это не оптимально, но мне не удалось использовать один цикл событий с call_soon_threadsafe, сохраняя при этом одно и то же поведение ... и я считаю, что производительность мудрая я все еще получаю много с двумя циклами событий, которые управляют выпуском ядра gil bound cpu ,
Должен ли я использовать три разных экземпляра AsyncioMotorClient (по одному на поток) и использовать их, как указано выше?Я пробовал с разными ошибками при попытке.
Вот мой пример кода, который не включает в себя только инициализацию MotorClient в Asynchro-х __init__
import threading
import asyncio
import concurrent.futures
import functools
import os
import time
import logging
from random import randint
from queue import Queue
# create logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('{}.log'.format(__name__))
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(processName)s - %(threadName)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
class SetQueue(Queue):
"""Queue that avoids duplicate entries while keeping an order."""
def _init(self, maxsize):
self.maxsize = maxsize
self.queue = set()
def _put(self, item):
if type(item) is not int:
raise TypeError
self.queue.add(item)
def _get(self):
# Get always all items in a thread-safe manner
ret = self.queue.copy()
self.queue.clear()
return ret
class Asynchro:
def __init__(self, event_queue):
self.__daemon = None
self.__daemon_terminate = False
self.__queue = event_queue
def fake_populate(self, size):
t = threading.Thread(target=self.worker, args=(size,))
t.daemon = True
t.start()
def worker(self, size):
run = True
populate_event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(populate_event_loop)
cors = [self.worker_cor(i, populate_event_loop) for i in range(size)]
done, pending = populate_event_loop.run_until_complete(asyncio.wait(cors))
logger.debug('Finished to populate event queue with result done={}, pending={}.'.format(done, pending))
while run:
# Keep it alive to simulate something still alive (minor traffic)
time.sleep(5)
rand = randint(100, 200)
populate_event_loop.run_until_complete(self.worker_cor(rand, populate_event_loop))
if self.__daemon_terminate:
logger.debug('Closed the populate_event_loop.')
populate_event_loop.close()
run = False
async def worker_cor(self, i, loop):
time.sleep(0.5)
self.__queue.put(i)
logger.debug('Wrote {} in the event queue that has now size {}.'.format(i, self.__queue.qsize()))
# Launch fake DB Insertions
#db_task = loop.create_task(self.fake_db_insert(i))
db_data = await self.fake_db_insert(i)
logger.info('Finished to populate with id {}'.format(i))
return db_data
@staticmethod
async def fake_db_insert(item):
# Fake some DB insert
logger.debug('Starting fake db insertion with id {}'.format(item))
st = randint(1, 101)/100
await asyncio.sleep(st)
logger.debug('Finished db insertion with id {}, sleep {}'.format(item, st))
return item
def loop_start(self):
logger.info('Starting the loop.')
if self.__daemon is not None:
raise Exception
self.__daemon_terminate = False
self.__daemon = threading.Thread(target=self.__daemon_main)
self.__daemon.daemon = True
self.__daemon.start()
def loop_stop(self):
logger.info('Stopping the loop.')
if self.__daemon is None:
raise Exception
self.__daemon_terminate = True
if threading.current_thread() != self.__daemon:
self.__daemon.join()
self.__daemon = None
logger.debug('Stopped the loop and closed the event_loop.')
def __daemon_main(self):
logger.info('Background daemon started (inside __daemon_main).')
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
run, rc = True, 0
while run:
logger.info('Inside \"while run\".')
event_loop.run_until_complete(self.__cor_main())
if self.__daemon_terminate:
event_loop.close()
run = False
rc = 1
return rc
async def __cor_main(self):
# If nothing in the queue release control for a bit
if self.__queue.qsize() == 0:
logger.info('Event queue is empty, going to sleep (inside __cor_main).')
await asyncio.sleep(10)
return
# Extract all items from event queue
items = self.__queue.get()
# Run asynchronously DB extraction and processing on the ids (using pool of processes)
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
cors = [self.__cor_process(item, executor) for item in items]
logger.debug('Launching {} coroutines to elaborate queue items (inside __cor_main).'.format(len(items)))
done, pending = await asyncio.wait(cors)
logger.debug('Finished to execute __cor_main with result {}, pending {}'
.format([t.result() for t in done], pending))
async def __cor_process(self, item, executor):
# Extract corresponding DB data
event_loop = asyncio.get_event_loop()
db_task = event_loop.create_task(self.fake_db_access(item))
db_data = await db_task
# Heavy processing of data done in different processes
logger.debug('Launching processes to elaborate db_data.')
res = await event_loop.run_in_executor(executor, functools.partial(self.fake_processing, db_data, None))
return res
@staticmethod
async def fake_db_access(item):
# Fake some db access
logger.debug('Starting fake db access with id {}'.format(item))
st = randint(1, 301)/100
await asyncio.sleep(st)
logger.debug('Finished db access with id {}, sleep {}'.format(item, st))
return item
@staticmethod
def fake_processing(db_data, _):
# fake some CPU processing
logger.debug('Starting fake processing with data {}'.format(db_data))
st = randint(1, 101)/10
time.sleep(st)
logger.debug('Finished fake processing with data {}, sleep {}, process id {}'.format(db_data, st, os.getpid()))
return db_data
def main():
# Event queue
queue = SetQueue()
return Asynchro(event_queue=queue)
if __name__ == '__main__':
a = main()
a.fake_populate(20)
time.sleep(5)
a.loop_start()
time.sleep(20)
a.loop_stop()
Тема «A» генерируется модулем mqtt (в реальном коде) и должна иметь возможность вызывать сопрограммы (например, «worker_cor») ... для этого требуется цикл событий в потоке. В то же время нить «B» - это фоновый демон, который проверяет очередь событий ... должен посылать сопрограммы, когда очередь не пуста ... еще один цикл! В потоке основного потока потребуется реальный код запустите некоторый db coroutine ... третий цикл! Как я могу использовать только один цикл событий с этими потоками? Не могли бы вы показать мне, изменив пример кода, каково будет ваше решение (действительно, один цикл означает отсутствие проблем с двигателем). – Bertone
Нет дальнейших объяснений или деталей? Я пытался использовать call_soon_threadsafe повторно использовать один и тот же цикл, но у меня несколько проблем, в том числе: «Исключение в обратного вызова None() ручки: TraceBack (самый последний вызов последнего): Файл » /usr/lib/python3.5 /asyncio/events.py ", строка 125, в _run self._callback (* self._args) ТипError: объект« NoneType »не может быть вызван». Я в значительной степени застрял здесь. –
Bertone
Извините, я до сих пор не понимаю, зачем вам нужны дополнительные циклы. Если ваш код, выполняемый внутри цикла, блокируется - пожалуйста, не делайте этого, а вызывайте 'ran_in_executor'. Если ваша библиотека инкапсулирует создание цикла внутри - это очень плохой запах *. Храните все в главном контуре - и создавайте задачи асинхронного просмотра для длительных рабочих заданий. Вот и все. –