Как ограничить параллелизм с Python asyncio?
Предположим, у нас есть несколько ссылок для загрузки, и каждая ссылка может занять разное количество времени для загрузки. И мне разрешено загружать, используя только 3 соединения. Теперь я хочу убедиться, что я делаю это эффективно, используя asyncio.
Вот что я пытаюсь достичь: в любой момент времени убедитесь, что у меня запущено как минимум 3 загрузки.
Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----
Цифры обозначают ссылки для скачивания, а дефисы - Ожидание загрузки.
Вот код, который я сейчас использую
from random import randint
import asyncio
count = 0
async def download(code, permit_download, no_concurrent, downloading_event):
global count
downloading_event.set()
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
count -= 1
if count < no_concurrent and not permit_download.is_set():
permit_download.set()
async def main(loop):
global count
permit_download = asyncio.Event()
permit_download.set()
downloading_event = asyncio.Event()
no_concurrent = 3
i = 0
while i < 9:
if permit_download.is_set():
count += 1
if count >= no_concurrent:
permit_download.clear()
loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
await downloading_event.wait() # To force context to switch to download function
downloading_event.clear()
i += 1
else:
await permit_download.wait()
await asyncio.sleep(9)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
И результат такой, как и ожидалось:
downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8
Но вот мои вопросы:
В данный момент я просто жду 9 секунд, чтобы основная функция работала до завершения загрузки. Есть ли эффективный способ ожидания завершения последней загрузки перед выходом из основной функции? (Я знаю там asyncio.wait, но мне нужно будет сохранить все ссылки на задачи, чтобы он работал)
Какая хорошая библиотека, которая выполняет такие задачи? Я знаю, что в JavaScript много асинхронных библиотек, но как насчет Python?
Редактировать:
2. Какая хорошая библиотека, которая заботится о распространенных асинхронных шаблонах? (Что-то вроде https://www.npmjs.com/package/async)
Ответы
Ответ 1
Вам в основном нужен пул задач фиксированного размера. asyncio
не поставляется с такой функциональностью из коробки, но ее легко создать: просто сохраняйте набор задач и не позволяйте ему превышать предел. Хотя в этом вопросе говорится о вашем нежелании идти по этому пути, код оказывается гораздо более элегантным:
async def download(code):
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
async def main(loop):
no_concurrent = 3
dltasks = set()
i = 0
while i < 9:
if len(dltasks) >= no_concurrent:
# Wait for some download to finish before adding a new one
_done, dltasks = await asyncio.wait(
dltasks, return_when=asyncio.FIRST_COMPLETED)
dltasks.add(loop.create_task(download(i)))
i += 1
# Wait for the remaining downloads to finish
await asyncio.wait(dltasks)
Альтернативой является создание фиксированного количества сопрограмм, выполняющих загрузку, во многом как пул потоков фиксированного размера, и подача их для работы с помощью asyncio.Queue
. Это устраняет необходимость вручную ограничивать количество загрузок, которое будет автоматически ограничено количеством вызывающих сопрограмм download()
:
# download() defined as above
async def download_from(q):
while True:
code = await q.get()
if code is None:
# pass on the word that we're done, and exit
await q.put(None)
break
await download(code)
async def main(loop):
q = asyncio.Queue()
dltasks = [loop.create_task(download_from(q)) for _ in range(3)]
i = 0
while i < 9:
await q.put(i)
i += 1
# Inform the consumers there is no more work.
await q.put(None)
await asyncio.wait(dltasks)
Что касается вашего другого вопроса, очевидным выбором будет aiohttp
.
Ответ 2
Если я не ошибаюсь, вы ищете asyncio.Semaphore. Пример использования:
import asyncio
from random import randint
async def download(code):
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
sem = asyncio.Semaphore(3)
async def safe_download(i):
async with sem: # semaphore limits num of simultaneous downloads
return await download(i)
async def main():
tasks = [
asyncio.ensure_future(safe_download(i)) # creating task starts coroutine
for i
in range(9)
]
await asyncio.gather(*tasks) # await moment all downloads done
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
Выход:
downloading 0 will take 3 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 3 second(s)
downloaded 1
downloaded 0
downloading 4 will take 2 second(s)
downloading 5 will take 1 second(s)
downloaded 5
downloaded 3
downloading 6 will take 3 second(s)
downloading 7 will take 1 second(s)
downloaded 4
downloading 8 will take 2 second(s)
downloaded 7
downloaded 8
downloaded 6
Пример асинхронной загрузки с aiohttp
можно найти здесь.
Ответ 3
Библиотека asyncio-pool делает именно то, что вам нужно.
https://pypi.org/project/asyncio-pool/
LIST_OF_URLS = ("http://www.google.com", "......")
pool = AioPool(size=3)
await pool.map(your_download_coroutine, LIST_OF_URLS)