2017-01-16 16 views
1

У меня есть функция pool_map, которая может использоваться для ограничения количества одновременно выполняемых функций.Как сделать асинхронный пул отмененным?

Идея заключается в том, чтобы иметь coroutine function принимать один параметр, который отображается список возможных параметров, а также обернуть все вызовы функций в семафор приобретения, после чего лишь ограниченное число работает сразу:

from typing import Callable, Awaitable, Iterable, Iterator 
from asyncio import Semaphore 

A = TypeVar('A') 
V = TypeVar('V') 

async def pool_map(
    func: Callable[[A], Awaitable[V]], 
    arg_it: Iterable[A], 
    size: int=10 
) -> Generator[Awaitable[V], None, None]: 
    """ 
    Maps an async function to iterables 
    ensuring that only some are executed at once. 
    """ 
    semaphore = Semaphore(size) 

    async def sub(arg): 
     async with semaphore: 
      return await func(arg) 

    return map(sub, arg_it) 

Я модифицировал и не тестировал выше код для примера, но мой вариант работает хорошо. Например. Вы можете использовать его как это:

from asyncio import get_event_loop, coroutine, as_completed 
from contextlib import closing 

URLS = [...] 

async def run_all(awaitables): 
    for a in as_completed(awaitables): 
     result = await a 
     print('got result', result) 

async def download(url): ... 


if __name__ != '__main__': 
    pool = pool_map(download, URLS) 

    with closing(get_event_loop()) as loop: 
     loop.run_until_complete(run_all(pool)) 

Но проблема возникает, если есть исключение брошено в ожидании будущего. Я не вижу, как отменить все запланированные или все еще запущенные задачи, ни те, которые все еще ждут получения семафора.

Есть ли библиотека или элегантный строительный блок для этого, что я не знаю, или мне нужно самостоятельно строить все части? (Т.е. Semaphore с доступом к своим официантам, а as_finished, который обеспечивает доступ к своей очереди выполняемой задачи, ...)

ответ

1

Использование ensure_future получить Task вместо сопрограммы:

import asyncio 
from contextlib import closing 


def pool_map(func, args, size=10): 
    """ 
    Maps an async function to iterables 
    ensuring that only some are executed at once. 
    """ 
    semaphore = asyncio.Semaphore(size) 

    async def sub(arg): 
     async with semaphore: 
      return await func(arg) 

    tasks = [asyncio.ensure_future(sub(x)) for x in args] 

    return tasks 


async def f(n): 
    print(">>> start", n) 

    if n == 7: 
     raise Exception("boom!") 

    await asyncio.sleep(n/10) 

    print("<<< end", n) 
    return n 


async def run_all(tasks): 
    exc = None 
    for a in asyncio.as_completed(tasks): 
     try: 
      result = await a 
      print('=== result', result) 
     except asyncio.CancelledError as e: 
      print("!!! cancel", e) 
     except Exception as e: 
      print("Exception in task, cancelling!") 
      for t in tasks: 
       t.cancel() 
      exc = e 
    if exc: 
     raise exc 


pool = pool_map(f, range(1, 20), 3) 

with closing(asyncio.get_event_loop()) as loop: 
    loop.run_until_complete(run_all(pool)) 
1

Вот наивное решение, основанному на том, что cancel является не-оп, если задача уже завершена:

async def run_all(awaitables): 
    futures = [asyncio.ensure_future(a) for a in awaitables] 
    try: 
     for fut in as_completed(futures): 
      result = await fut 
      print('got result', result) 
    except: 
     for future in futures: 
      future.cancel() 
     await asyncio.wait(futures)