Python aiohttp/asyncio - как обрабатывать возвращенные данные
Im в процессе перемещения некоторого синхронного кода в asyncio с использованием aiohttp. синхронный код занимал 15 минут, поэтому я надеюсь улучшить это.
У меня есть рабочий код, который получает данные из некоторых URL-адресов и возвращает тело каждого из них. Но это только против 1 лабораторного сайта, у меня есть 70 + фактических сайтов.
Итак, если у меня есть цикл для создания списка всех URL-адресов для всех сайтов, которые будут обрабатывать 700 URL-адресов в списке. Теперь, обрабатывая их, я не думаю, что это проблема?
Но делая "материал" с результатами, я не уверен, как программировать? У меня уже есть код, который будет делать "материал" для каждого возвращаемого результата, но я не уверен, как программировать его с правильным типом результата.
Когда запускает код, он обрабатывает все URL-адреса и в зависимости от времени запуска возвращает неизвестный порядок?
Нужна ли мне функция, которая будет обрабатывать любой тип результата?
import asyncio, aiohttp, ssl
from bs4 import BeautifulSoup
def page_content(page):
return BeautifulSoup(page, 'html.parser')
async def fetch(session, url):
with aiohttp.Timeout(15, loop=session.loop):
async with session.get(url) as response:
return page_content(await response.text())
async def get_url_data(urls, username, password):
tasks = []
# Fetch all responses within one Client session,
# keep connection alive for all requests.
async with aiohttp.ClientSession(auth=aiohttp.BasicAuth(username, password)) as session:
for i in urls:
task = asyncio.ensure_future(fetch(session, i))
tasks.append(task)
responses = await asyncio.gather(*tasks)
# you now have all response bodies in this variable
for i in responses:
print(i.title.text)
return responses
def main():
username = 'monitoring'
password = '*********'
ip = '10.10.10.2'
urls = [
'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.10.0.1'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.0.1'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'frontend.domain.com'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'planner.domain.com'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.10.1'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.11.11.1'),
'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.12.12.60'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.12.12.60'),
'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'lon-dc-01.domain.com'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'lon-dc-01.domain.com'),
]
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_url_data(urls,username,password))
data = loop.run_until_complete(future)
print(data)
if __name__ == "__main__":
main()
Ответы
Ответ 1
Ваш код находится недалеко от знака. asyncio.gather
возвращает результаты в порядке аргументов, поэтому порядок сохраняется здесь, но page_content
не будет вызываться по порядку.
Несколько настроек:
Прежде всего, вам не нужно ensure_future
здесь. Создание задачи требуется только в том случае, если вы пытаетесь передать сопрограмму ее родителя, т.е. Если задача должна продолжаться, даже если созданная функция выполнена. Здесь вам нужно вместо вызова asyncio.gather
непосредственно с вашими сопрограммами:
async def get_url_data(urls, username, password):
async with aiohttp.ClientSession(...) as session:
responses = await asyncio.gather(*(fetch(session, i) for i in urls))
for i in responses:
print(i.title.text)
return responses
Но, вызывающий это, будет назначать все выборки одновременно и с большим количеством URL-адресов, это далеко не оптимально. Вместо этого вы должны выбрать максимум concurrency и обеспечить, чтобы в большинстве случаев X-выборки выполнялись в любое время. Чтобы реализовать это, вы можете использовать asyncio.Semaphore(20)
, этот семафор можно получить не более чем из 20 сопрограмм, так что остальные будут ждать, пока они не появятся.
CONCURRENCY = 20
TIMEOUT = 15
async def fetch(session, sem, url):
async with sem:
async with session.get(url) as response:
return page_content(await response.text())
async def get_url_data(urls, username, password):
sem = asyncio.Semaphore(CONCURRENCY)
async with aiohttp.ClientSession(...) as session:
responses = await asyncio.gather(*(
asyncio.wait_for(fetch(session, sem, i), TIMEOUT)
for i in urls
))
for i in responses:
print(i.title.text)
return responses
Таким образом, все выборки запускаются немедленно, но только 20 из них смогут получить семафор. Остальные блокируются при первой инструкции async with
и ждут, пока не будет выполнена другая выборка.
Я также заменил aiohttp.Timeout официальным асинхронным эквивалентом здесь.
Наконец, для фактической обработки данных, если вы ограничены временем процессора, asyncio, вероятно, вам не поможет. Вам нужно будет использовать ProcessPoolExecutor
здесь для параллелизации фактической работы с другим ЦП. run_in_executor
, вероятно, будет полезен.
Ответ 2
Вот пример с concurrent.futures.ProcessPoolExecutor
. Если он создан без указания max_workers
, реализация будет использовать вместо os.cpu_count
. Также обратите внимание, что asyncio.wrap_future
является общедоступным, но недокументированным. Альтернативно, AbstractEventLoop.run_in_executor
.
import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp
import lxml.html
def process_page(html):
'''Meant for CPU-bound workload'''
tree = lxml.html.fromstring(html)
return tree.find('.//title').text
async def fetch_page(url, session):
'''Meant for IO-bound workload'''
async with session.get(url, timeout = 15) as res:
return await res.text()
async def process(url, session, pool):
html = await fetch_page(url, session)
return await asyncio.wrap_future(pool.submit(process_page, html))
async def dispatch(urls):
pool = ProcessPoolExecutor()
async with aiohttp.ClientSession() as session:
coros = (process(url, session, pool) for url in urls)
return await asyncio.gather(*coros)
def main():
urls = [
'https://stackoverflow.com/',
'https://serverfault.com/',
'https://askubuntu.com/',
'https://unix.stackexchange.com/'
]
result = asyncio.get_event_loop().run_until_complete(dispatch(urls))
print(result)
if __name__ == '__main__':
main()