2017-02-21 17 views
0

У меня есть приложение tornado, которому необходимо запустить функцию блокировки на ProcessPoolExecutor. Эта функция блокировки использует библиотеку, которая испускает инкрементные результаты через события blinker. Я хотел бы собрать эти события и отправить их обратно в свое приложение tornado по мере их возникновения.Соберите инкрементные результаты от ProcessPoolExecutor от Tornado

Сначала tornado был идеален для этого случая использования, потому что он асинхронный. Я думал, что могу просто передать объект tornado.queues.Queue функции, которая будет запущена в пуле, а затем put() событий в этой очереди как часть моего обратного вызова события blinker.

Однако, прочитав документы tornado.queues.Queue, я узнал, что они не управляются через процессы, такие как multiprocessing.Queue и не являются потокобезопасными.

Есть ли способ получить эти события из pool по мере их возникновения? Должен ли я обернуть multiprocessing.Queue, чтобы он произвел Futures? Это вряд ли работает, поскольку я сомневаюсь, что внутренние компоненты multiprocessing совместимы с tornado.

[EDIT] Есть некоторые хорошие подсказки здесь: https://gist.github.com/hoffrocket/8050711

ответ

0

Вы можете сделать это проще, чем это. Вот сопрограммная, что представляет четыре медленные вызовы функций подпроцессов и их ждет:

from concurrent.futures import ProcessPoolExecutor 
from time import sleep 

from tornado import gen, ioloop 

pool = ProcessPoolExecutor() 


def calculate_slowly(x): 
    sleep(x) 
    return x 


async def parallel_tasks(): 
    # Create futures in a randomized order. 
    futures = [gen.convert_yielded(pool.submit(calculate_slowly, i)) 
       for i in [1, 3, 2, 4]] 

    wait_iterator = gen.WaitIterator(*futures) 
    while not wait_iterator.done(): 
     try: 
      result = await wait_iterator.next() 
     except Exception as e: 
      print("Error {} from {}".format(e, wait_iterator.current_future)) 
     else: 
      print("Result {} received from future number {}".format(
       result, wait_iterator.current_index)) 


ioloop.IOLoop.current().run_sync(parallel_tasks) 

Он выводит:

Result 1 received from future number 0 
Result 2 received from future number 2 
Result 3 received from future number 1 
Result 4 received from future number 3 

Вы можете видеть, что сопрограммная получает результаты в порядке их завершения, а не порядок их были представлены: будущий номер 1 решает после будущего числа 2, поскольку будущий номер 1 спал дольше. convert_yielded преобразует фьючерсы, возвращенные ProcessPoolExecutor, в фьючерсы, совместимые с Tornado, которые можно ожидать в сопрограмме.

Каждое будущее разрешает значение, возвращаемое методом вычислений: в этом случае это то же самое число, которое было передано в calculate_slowly, и столько же секунд, сколько и sleep_slowly sleeps.

Чтобы включить это в RequestHandler, попробовать что-то вроде этого:

class MainHandler(web.RequestHandler): 
    async def get(self): 
     self.write("Starting....\n") 
     self.flush() 

     futures = [gen.convert_yielded(pool.submit(calculate_slowly, i)) 
        for i in [1, 3, 2, 4]] 

     wait_iterator = gen.WaitIterator(*futures) 
     while not wait_iterator.done(): 
      result = await wait_iterator.next() 
      self.write("Result {} received from future number {}\n".format(
       result, wait_iterator.current_index)) 

      self.flush() 


if __name__ == "__main__": 
    application = web.Application([ 
     (r"/", MainHandler), 
    ]) 
    application.listen(8888) 
    ioloop.IOLoop.instance().start() 

Вы можете наблюдать, если вы curl localhost:8888, что сервер отвечает приращением на запрос клиента.

+0

Это не совсем то, что я имею в виду. Чтобы уточнить, у меня есть один медленный вызов функции. И этот вызов функции выдает несколько событий через 'blinker'. Я хотел бы собирать эти события и отправлять их обратно в основной поток торнадо.Я считаю, что код, который у вас здесь, выполняет четыре медленные функции параллельно, а не одну медленную функцию. – matthewatabet

1

Чтобы собрать что-либо, кроме возвращаемого значения задачи, переданного в ProcessPoolExecutor, вы должны использовать multiprocessing.Queue (или другой объект из библиотеки multiprocessing). Затем, поскольку multiprocessing.Queue предоставляет только синхронный интерфейс, вы должны использовать другой поток в родительском процессе для чтения из очереди (не дойдя до деталей реализации. Здесь есть дескриптор файла, который можно использовать здесь, но на этот раз мы будем игнорировать это это недокументировано и может быть изменено).

Вот быстрый непроверенных пример:

queue = multiprocessing.Queue() 
proc_pool = concurrent.futures.ProcessPoolExecutor() 
thread_pool = concurrent.futures.ThreadPoolExecutor() 

async def read_events(): 
    while True: 
     event = await thread_pool.submit(queue.get) 
     print(event) 

async def foo(): 
    IOLoop.current.spawn_callback(read_events) 
    await proc_pool.submit(do_something_and_write_to_queue) 
+0

Да, это более или менее то, что я закончил, но использовал модуль 'aioprocessing', который поддерживает неблокирующее' AioQueue.coro_get'. – matthewatabet