Как правильно создавать и запускать параллельные задачи с помощью модуля python asyncio?
Я пытаюсь правильно понять и реализовать два одновременно работающих Task
объектов, используя Python 3 относительно новый asyncio
.
В двух словах asyncio, похоже, предназначен для обработки асинхронных процессов и параллельного выполнения Task
по циклу событий. Это способствует использованию await
(применяется в асинхронных функциях) как обратный вызов для ожидания и использования результата без блокировки цикла события. (Фьючерсы и обратные вызовы по-прежнему являются жизнеспособной альтернативой.)
Он также предоставляет класс asyncio.Task()
, специализированный подкласс Future
, предназначенный для переноса сопрограмм. Предпочтительно вызывается с помощью метода asyncio.ensure_future()
. Предполагаемое использование задач asyncio - позволить независимо запущенным задачам запускаться "одновременно" с другими задачами в пределах одного цикла событий. Я понимаю, что Tasks
связаны с циклом события, который затем автоматически продолжает управлять сопрограммой между операторами await
.
Мне нравится идея использовать одновременные Задачи без необходимости использовать один из классов Executor
, но у меня нет нашел много проработки в отношении реализации.
Вот как я это делаю сейчас:
import asyncio
print('running async test')
async def say_boo():
i = 0
while True:
await asyncio.sleep(0)
print('...boo {0}'.format(i))
i += 1
async def say_baa():
i = 0
while True:
await asyncio.sleep(0)
print('...baa {0}'.format(i))
i += 1
# OPTION 1: wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())
loop = asyncio.get_event_loop()
loop.run_forever()
В случае попытки одновременного запуска двух задач цикла, я заметил, что, если у задачи нет внутреннего выражения await
, он застрянет в цикле while
, эффективно блокируя выполнение других задач (много как обычный цикл while
). Однако, как только Задачи должны (а) ждать, они, кажется, запускаются одновременно без проблем.
Таким образом, операторы await
, как представляется, обеспечивают цикл событий с плацдармом для переключения между задачами, давая эффект concurrency.
Пример вывода с внутренним await
:
running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2
Пример вывода без внутреннего await
:
...boo 0
...boo 1
...boo 2
...boo 3
...boo 4
Вопросы
Проходит ли эта реализация для "правильного" примера параллельных циклов Задачи в asyncio
?
Правильно ли, что единственный способ, которым это работает, - это Task
предоставить точку блокировки (выражение await
), чтобы цикл события мог манипулировать несколькими задачами?
Ответы
Ответ 1
Да, любая сопрограмма, запущенная внутри цикла событий, блокирует запуск других сопрограмм и задач, если только
- Вызывает другую сопрограмму с помощью
yield from
или await
(если используется Python 3.5 +).
- Возвращает.
Это потому, что asyncio
является однопоточным; единственный способ запуска цикла события - не продолжать активное выполнение другой сопрограммы. Использование yield from
/await
временно приостанавливает сопрограмму, предоставляя циклу событий возможность работать.
Ваш примерный код в порядке, но во многих случаях вам, вероятно, не нужен долговременный код, который не выполняет асинхронный ввод-вывод, запущенный внутри цикла событий. В таких случаях часто бывает полезно использовать BaseEventLoop.run_in_executor
для запуска кода в фоновом потоке или процессе. ProcessPoolExecutor
будет лучшим выбором, если ваша задача связана с CPU, ThreadPoolExecutor
будет использоваться, если вам нужно сделать некоторые операции ввода-вывода, которые не являются asyncio
-другими.
Ваши две петли, например, полностью привязаны к процессору и не разделяют какое-либо состояние, поэтому наилучшая производительность будет получена при использовании ProcessPoolExecutor
для параллельного запуска каждого цикла между CPU:
import asyncio
from concurrent.futures import ProcessPoolExecutor
print('running async test')
def say_boo():
i = 0
while True:
print('...boo {0}'.format(i))
i += 1
def say_baa():
i = 0
while True:
print('...baa {0}'.format(i))
i += 1
if __name__ == "__main__":
executor = ProcessPoolExecutor(2)
loop = asyncio.get_event_loop()
boo = asyncio.ensure_future(loop.run_in_executor(executor, say_boo))
baa = asyncio.ensure_future(loop.run_in_executor(executor, say_baa))
loop.run_forever()
Ответ 2
Вам необязательно использовать yield from x
для управления контуром событий.
В вашем примере я думаю, что правильным способом было бы сделать yield None
или эквивалентно простой yield
, а не yield from asyncio.sleep(0.001)
:
import asyncio
@asyncio.coroutine
def say_boo():
i = 0
while True:
yield None
print("...boo {0}".format(i))
i += 1
@asyncio.coroutine
def say_baa():
i = 0
while True:
yield
print("...baa {0}".format(i))
i += 1
boo_task = asyncio.async(say_boo())
baa_task = asyncio.async(say_baa())
loop = asyncio.get_event_loop()
loop.run_forever()
Coroutines - это просто старые генераторы Python.
Внутренний цикл событий asyncio
хранит запись этих генераторов и вызывает gen.send()
по каждому из них по одному в бесконечном цикле. Всякий раз, когда вы yield
, вызов gen.send()
завершается, и цикл может двигаться дальше. (Я упрощаю это, огляните https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265 для фактического кода)
Тем не менее, я по-прежнему буду идти по маршруту run_in_executor
, если вам нужно делать вычисления с интенсивным вычислением без обмена данными.