0

Я вернулся с вопросом об asyncio. Я считаю, что это очень полезно (особенно из-за GIL с потоками), и я пытаюсь повысить производительность некоторых фрагментов кода.python 3 asyncio и MotorClient: как использовать двигатель с многопоточными и многократными циклами событий

Мое приложение делает следующее:

  • 1 Фоновой демон нити «А» получает события от подключенных клиентов и реагирующее на заполнение SetQueue (что просто очередь события, которое удаляет повторяющиеся идентификаторы) и выполнив некоторые вставки в БД. Я получаю этот демон от другого модуля (в основном я управляю обратным вызовом с момента получения события). В моем примере ниже я подменил это потоком, который я генерирую, и это просто просто заполняет очередь 20 элементами и имитирует вставки БД перед выходом.
  • 1 Фоновая демон поток «B» запускается (loop_start), и он просто перебирает работает до завершения сопрограмму, что:

    • распаковывает все элементы в очереди (если оно не пусто, в противном случае освободить контроль за е секунды, а затем сопрограммный повторно запущен)
    • для каждого идентификатора в очереди, запускает прикован сопрограммным что:

      • Создает и ждет задач, которая просто извлекает все соответствующую информацию для этого идентификатора из БД. Я использую MotorClient, который поддерживает asyncio, чтобы ждать в самой задаче.

      • Использует исполнитель пула процессов, чтобы запустить процесс на один идентификатор, который использует данные БД для выполнения интенсивной обработки ЦП.

  • Основной поток просто инициализирует db_client и принимает loop_start и команды останова.

То есть в основном это.

Теперь я стараюсь повысить производительность как можно больше.

Мой текущий вопрос заключается в использовании motor.motor_asyncio.AsyncioMotorClient() таким образом:

  1. Он инициализируется в основном потоке, и там я хочу, чтобы создать индексы
  2. темы «A» должен выполнить DB вставки
  3. Резьбе «B» необходимо выполнить поиск и считывание БД

Как это сделать? Motor заявляет, что он предназначен для приложения с одним потоком, где вы используете явно один цикл событий. Здесь я оказался вынужденным иметь два цикла событий, один в потоке «А» и один в потоке «В». Это не оптимально, но мне не удалось использовать один цикл событий с call_soon_threadsafe, сохраняя при этом одно и то же поведение ... и я считаю, что производительность мудрая я все еще получаю много с двумя циклами событий, которые управляют выпуском ядра gil bound cpu ,

Должен ли я использовать три разных экземпляра AsyncioMotorClient (по одному на поток) и использовать их, как указано выше?Я пробовал с разными ошибками при попытке.

Вот мой пример кода, который не включает в себя только инициализацию MotorClient в Asynchro-х __init__

import threading 
import asyncio 
import concurrent.futures 
import functools 
import os 
import time 
import logging 
from random import randint 
from queue import Queue 





# create logger 
logger = logging.getLogger(__name__) 
logger.setLevel(logging.DEBUG) 
# create file handler which logs even debug messages 
fh = logging.FileHandler('{}.log'.format(__name__)) 
fh.setLevel(logging.DEBUG) 
# create console handler with a higher log level 
ch = logging.StreamHandler() 
ch.setLevel(logging.DEBUG) 
# create formatter and add it to the handlers 
formatter = logging.Formatter('%(asctime)s - %(name)s - %(processName)s - %(threadName)s - %(levelname)s - %(message)s') 
fh.setFormatter(formatter) 
ch.setFormatter(formatter) 
# add the handlers to the logger 
logger.addHandler(fh) 
logger.addHandler(ch) 


class SetQueue(Queue): 
    """Queue that avoids duplicate entries while keeping an order.""" 
    def _init(self, maxsize): 
     self.maxsize = maxsize 
     self.queue = set() 

    def _put(self, item): 
     if type(item) is not int: 
      raise TypeError 
     self.queue.add(item) 

    def _get(self): 
     # Get always all items in a thread-safe manner 
     ret = self.queue.copy() 
     self.queue.clear() 
     return ret 


class Asynchro: 
    def __init__(self, event_queue): 
     self.__daemon = None 
     self.__daemon_terminate = False 
     self.__queue = event_queue 

    def fake_populate(self, size): 
     t = threading.Thread(target=self.worker, args=(size,)) 
     t.daemon = True 
     t.start() 

    def worker(self, size): 
     run = True 
     populate_event_loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(populate_event_loop) 
     cors = [self.worker_cor(i, populate_event_loop) for i in range(size)] 
     done, pending = populate_event_loop.run_until_complete(asyncio.wait(cors)) 
     logger.debug('Finished to populate event queue with result done={}, pending={}.'.format(done, pending)) 
     while run: 
      # Keep it alive to simulate something still alive (minor traffic) 
      time.sleep(5) 
      rand = randint(100, 200) 
      populate_event_loop.run_until_complete(self.worker_cor(rand, populate_event_loop)) 
      if self.__daemon_terminate: 
       logger.debug('Closed the populate_event_loop.') 
       populate_event_loop.close() 
       run = False 

    async def worker_cor(self, i, loop): 
     time.sleep(0.5) 
     self.__queue.put(i) 
     logger.debug('Wrote {} in the event queue that has now size {}.'.format(i, self.__queue.qsize())) 
     # Launch fake DB Insertions 
     #db_task = loop.create_task(self.fake_db_insert(i)) 
     db_data = await self.fake_db_insert(i) 
     logger.info('Finished to populate with id {}'.format(i)) 
     return db_data 

    @staticmethod 
    async def fake_db_insert(item): 
     # Fake some DB insert 
     logger.debug('Starting fake db insertion with id {}'.format(item)) 
     st = randint(1, 101)/100 
     await asyncio.sleep(st) 
     logger.debug('Finished db insertion with id {}, sleep {}'.format(item, st)) 
     return item 

    def loop_start(self): 
     logger.info('Starting the loop.') 
     if self.__daemon is not None: 
      raise Exception 
     self.__daemon_terminate = False 
     self.__daemon = threading.Thread(target=self.__daemon_main) 
     self.__daemon.daemon = True 
     self.__daemon.start() 

    def loop_stop(self): 
     logger.info('Stopping the loop.') 
     if self.__daemon is None: 
      raise Exception 
     self.__daemon_terminate = True 
     if threading.current_thread() != self.__daemon: 
      self.__daemon.join() 
      self.__daemon = None 
      logger.debug('Stopped the loop and closed the event_loop.') 

    def __daemon_main(self): 
     logger.info('Background daemon started (inside __daemon_main).') 
     event_loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(event_loop) 
     run, rc = True, 0 
     while run: 
      logger.info('Inside \"while run\".') 
      event_loop.run_until_complete(self.__cor_main()) 
      if self.__daemon_terminate: 
       event_loop.close() 
       run = False 
       rc = 1 
     return rc 

    async def __cor_main(self): 
     # If nothing in the queue release control for a bit 
     if self.__queue.qsize() == 0: 
      logger.info('Event queue is empty, going to sleep (inside __cor_main).') 
      await asyncio.sleep(10) 
      return 
     # Extract all items from event queue 
     items = self.__queue.get() 
     # Run asynchronously DB extraction and processing on the ids (using pool of processes) 
     with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor: 
      cors = [self.__cor_process(item, executor) for item in items] 
      logger.debug('Launching {} coroutines to elaborate queue items (inside __cor_main).'.format(len(items))) 
      done, pending = await asyncio.wait(cors) 
      logger.debug('Finished to execute __cor_main with result {}, pending {}' 
         .format([t.result() for t in done], pending)) 

    async def __cor_process(self, item, executor): 
     # Extract corresponding DB data 
     event_loop = asyncio.get_event_loop() 
     db_task = event_loop.create_task(self.fake_db_access(item)) 
     db_data = await db_task 
     # Heavy processing of data done in different processes 
     logger.debug('Launching processes to elaborate db_data.') 
     res = await event_loop.run_in_executor(executor, functools.partial(self.fake_processing, db_data, None)) 
     return res 

    @staticmethod 
    async def fake_db_access(item): 
     # Fake some db access 
     logger.debug('Starting fake db access with id {}'.format(item)) 
     st = randint(1, 301)/100 
     await asyncio.sleep(st) 
     logger.debug('Finished db access with id {}, sleep {}'.format(item, st)) 
     return item 

    @staticmethod 
    def fake_processing(db_data, _): 
     # fake some CPU processing 
     logger.debug('Starting fake processing with data {}'.format(db_data)) 
     st = randint(1, 101)/10 
     time.sleep(st) 
     logger.debug('Finished fake processing with data {}, sleep {}, process id {}'.format(db_data, st, os.getpid())) 
     return db_data 


def main(): 
    # Event queue 
    queue = SetQueue() 
    return Asynchro(event_queue=queue) 


if __name__ == '__main__': 
    a = main() 
    a.fake_populate(20) 
    time.sleep(5) 
    a.loop_start() 
    time.sleep(20) 
    a.loop_stop() 

ответ

1

В чем причина для запуска нескольких циклов событий?

Я предлагаю использовать один цикл в основном потоке, это родной режим для asyncio.

asyncio может запустить цикл в не основной теме в очень редких сценариях, но это не похоже на ваш случай.

+0

Тема «A» генерируется модулем mqtt (в реальном коде) и должна иметь возможность вызывать сопрограммы (например, «worker_cor») ... для этого требуется цикл событий в потоке. В то же время нить «B» - это фоновый демон, который проверяет очередь событий ... должен посылать сопрограммы, когда очередь не пуста ... еще один цикл! В потоке основного потока потребуется реальный код запустите некоторый db coroutine ... третий цикл! Как я могу использовать только один цикл событий с этими потоками? Не могли бы вы показать мне, изменив пример кода, каково будет ваше решение (действительно, один цикл означает отсутствие проблем с двигателем). – Bertone

+0

Нет дальнейших объяснений или деталей? Я пытался использовать call_soon_threadsafe повторно использовать один и тот же цикл, но у меня несколько проблем, в том числе: «Исключение в обратного вызова None() ручки: TraceBack (самый последний вызов последнего): Файл » /usr/lib/python3.5 /asyncio/events.py ", строка 125, в _run self._callback (* self._args) ТипError: объект« NoneType »не может быть вызван». Я в значительной степени застрял здесь. – Bertone

+0

Извините, я до сих пор не понимаю, зачем вам нужны дополнительные циклы. Если ваш код, выполняемый внутри цикла, блокируется - пожалуйста, не делайте этого, а вызывайте 'ran_in_executor'. Если ваша библиотека инкапсулирует создание цикла внутри - это очень плохой запах *. Храните все в главном контуре - и создавайте задачи асинхронного просмотра для длительных рабочих заданий. Вот и все. –