У меня есть функция pool_map
, которая может использоваться для ограничения количества одновременно выполняемых функций.Как сделать асинхронный пул отмененным?
Идея заключается в том, чтобы иметь coroutine function принимать один параметр, который отображается список возможных параметров, а также обернуть все вызовы функций в семафор приобретения, после чего лишь ограниченное число работает сразу:
from typing import Callable, Awaitable, Iterable, Iterator
from asyncio import Semaphore
A = TypeVar('A')
V = TypeVar('V')
async def pool_map(
func: Callable[[A], Awaitable[V]],
arg_it: Iterable[A],
size: int=10
) -> Generator[Awaitable[V], None, None]:
"""
Maps an async function to iterables
ensuring that only some are executed at once.
"""
semaphore = Semaphore(size)
async def sub(arg):
async with semaphore:
return await func(arg)
return map(sub, arg_it)
Я модифицировал и не тестировал выше код для примера, но мой вариант работает хорошо. Например. Вы можете использовать его как это:
from asyncio import get_event_loop, coroutine, as_completed
from contextlib import closing
URLS = [...]
async def run_all(awaitables):
for a in as_completed(awaitables):
result = await a
print('got result', result)
async def download(url): ...
if __name__ != '__main__':
pool = pool_map(download, URLS)
with closing(get_event_loop()) as loop:
loop.run_until_complete(run_all(pool))
Но проблема возникает, если есть исключение брошено в ожидании будущего. Я не вижу, как отменить все запланированные или все еще запущенные задачи, ни те, которые все еще ждут получения семафора.
Есть ли библиотека или элегантный строительный блок для этого, что я не знаю, или мне нужно самостоятельно строить все части? (Т.е. Semaphore
с доступом к своим официантам, а as_finished
, который обеспечивает доступ к своей очереди выполняемой задачи, ...)