Другая документация Asycnio еще менее полезна. Там нет информации о том, как это работает, только некоторые рекомендации о том, как ее использовать, которые также иногда вводят в заблуждение/очень плохо написаны.
Я знаком с Go-реализацией сопрограмм и очень надеюсь, что Python сделает то же самое. Если бы это было так, то код, который я нашел в указанной выше ссылке, сработал бы. Так как это не так, я теперь пытаюсь понять, почему. До сих пор я догадываюсь, пожалуйста, поправьте меня, где я ошибаюсь:
Если мое предположение окажется правильным: тогда у меня есть проблема. Как действительно в действительности I/O происходит в этом сценарии? В отдельном потоке? Выключен ли весь интерпретатор, а ввод/вывод происходит за пределами интерпретатора? Что именно подразумевается под I/O? Если моя процедура python называется процедурой C open()
, и она, в свою очередь, отправляет прерывание в ядро, отказывается от управления им, как интерпретатор Python знает об этом и может продолжить запуск другого кода, в то время как код ядра делает фактический ввод /O и до тех пор, пока он не пробудет процедуру Python, которая первоначально отправила прерывание? Как Python может интерпретировать в принципе, знать об этом?
Ответ 1
Как работает асинчо?
Прежде чем ответить на этот вопрос, нам нужно понять несколько базовых терминов, пропустите их, если вы уже знаете их.
Генераторы - это объекты, которые позволяют нам приостановить выполнение функции python. Пользовательский куратором генераторы реализовать, используя ключевое слово yield
. Создавая нормальную функцию, содержащую ключевое слово yield
, мы превращаем эту функцию в генератор:
>>> def test():
... yield 1
... yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
Как вы можете видеть, вызов next()
в генераторе заставляет интерпретатор загружать тестовый фрейм и возвращать значение yield
значения. Вызов next()
снова заставит кадр снова загрузиться в стек интерпретатора и продолжить с yield
другого значения.
В третий раз вызывается next()
, наш генератор был закончен, и StopIteration
был брошен.
Общение с генератором
Менее известной особенностью генераторов является тот факт, что вы можете общаться с ними, используя два метода: send()
и throw()
.
>>> def test():
... val = yield 1
... print(val)
... yield 2
... yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in test
Exception
При вызове gen.send()
значение передается как возвращаемое значение из ключевого слова yield
.
gen.throw()
с другой стороны, позволяет бросать Исключения внутри генераторов, причем исключение, поднятое на том же yield
было вызвано.
Возвращаемые значения от генераторов
Возврат значения из генератора приводит к тому, что значение помещается внутри исключения StopIteration
. Впоследствии мы можем восстановить значение из исключения и использовать его для наших нужд.
>>> def test():
... yield 1
... return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
... next(gen)
... except StopIteration as exc:
... print(exc.value)
...
abc
Вот, новое ключевое слово: yield from
Python 3.4 появился с добавлением нового ключевого слова: yield from
. То, что это ключевое слово позволяет нам делать, - это передать любые next()
, send()
и throw()
в самый внутренний вложенный генератор. Если внутренний генератор возвращает значение, это также возвращаемое значение yield from
:
>>> def inner():
... print((yield 2))
... return 3
...
>>> def outer():
... yield 1
... val = yield from inner()
... print(val)
... yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen)
2
>>> gen.send("abc")
abc
3
4
Объединяя все это
После ввода нового yield from
ключевого слова yield from
Python 3.4 мы теперь могли создавать генераторы внутри генераторов, которые точно так же, как туннель, передают данные назад и вперед от самых внутренних до самых внешних генераторов. Это породило новое значение для генераторов - сопрограммы.
Coroutines - это функции, которые можно остановить и возобновить во время запуска. В Python они определяются с использованием ключевого слова async def
. Подобно генераторам, они тоже используют свою собственную форму yield from
которой await
. До того, как в Python 3.5 были введены async
и await
, мы создали сопрограммы точно так же, как были созданы генераторы (с yield from
вместо await
).
async def inner():
return 1
async def outer():
await inner()
Как и каждый итератор или генератор, реализующий метод __iter__()
, сопрограммы реализуют __await__()
что позволяет им продолжать каждый раз, await coro
.
Там хорошая диаграмма последовательности внутри документов Python, которые вы должны проверить.
В asyncio, помимо функций coroutine, у нас есть два важных объекта: задачи и фьючерсы.
Фьючерсы - это объекты, которые __await__()
метод __await__()
, и их задача состоит в том, чтобы удерживать определенное состояние и результат. Состояние может быть одним из следующих:
- PENDING - будущее не имеет никакого результата или исключения.
- CANCELED - будущее было отменено с использованием
fut.cancel()
- FINISHED - будущее было закончено либо с помощью набора результатов с использованием
fut.set_result()
либо с помощью набора исключений, использующего fut.set_exception()
Результат, как вы уже догадались, может быть либо объектом Python, который будет возвращен, либо исключением, которое может быть поднято.
Еще одна важная особенность future
объектов заключается в том, что они содержат метод add_done_callback()
. Этот метод позволяет вызывать функции, как только задача будет выполнена, - вызвала ли она исключение или закончила.
Объектами задачи являются специальные фьючерсы, которые обтекают сопрограммы и общаются с наиболее внутренними и внешними сопрограммами. Каждый раз, когда сопрограмма await
будущее, будущее передается обратно на задание (как и в случае yield from
), и задача получает его.
Затем задача привязана к будущему. Он делает это, вызывая add_done_callback()
в будущем. С этого момента, если будущее будет когда-либо сделано, либо отменив, либо исключив, либо передав объект Python, будет вызван обратный вызов задачи, и он вернется к существованию.
Asyncio
Ответ на последний вопрос, о котором мы должны ответить, - это вопрос о том, как реализуется ИО?
Глубоко внутри asyncio у нас есть цикл событий. Цикл событий задач. Задача цикла события - вызывать задачи каждый раз, когда они готовы, и координировать все эти усилия на одном рабочем компьютере.
Часть IO цикла события построена на одной важной функции, называемой select
. Select - это функция блокировки, реализованная под операционной системой, которая позволяет ждать сокетов для входящих или исходящих данных. При получении данных он просыпается и возвращает сокеты, которые получили данные, или сокеты, которые готовы для записи.
Когда вы пытаетесь получить или отправить данные через сокет через asyncio, то, что на самом деле происходит ниже, является то, что сокет сначала проверяется, есть ли у него какие-либо данные, которые можно немедленно прочитать или отправить. Если .send()
заполнен или .recv()
пуст, сокет зарегистрирован в функции select
(просто добавив его в один из списков, rlist
для recv
и wlist
для send
) и соответствующий функция await
новый созданный future
объект, привязанный к этому сокету.
Когда все доступные задачи ждут фьючерсы, цикл цикла вызывает select
и ждет. Когда один из сокетов имеет входящие данные или send
буфер, очищается, asyncio проверяет будущий объект, привязанный к этому сокету, и устанавливает его.
Теперь все волшебство происходит. Будущее настроено на выполнение, задача, добавленная ранее с помощью add_done_callback()
, возвращается к жизни и вызывает .send()
на сопрограмме coroutine, которая возвращает внутреннюю .send()
coroutine (из-за цепи await
), и вы читаете вновь полученные данные из соседнего буфера, который был пропущен.
Цепочка метода снова, в случае recv()
:
-
select.select
ждет. - Готовый сокет с данными возвращается.
- Данные из сокета перемещаются в буфер.
-
future.set_result()
. - Задача, которая добавила себя с помощью
add_done_callback()
, теперь разбужена. - Задача вызывает
.send()
на сопрограмме, которая проходит полностью во внутреннюю часть coroutine и пробуждает ее. - Данные считываются из буфера и возвращаются нашему скромному пользователю.
Таким образом, asyncio использует возможности генератора, которые позволяют приостанавливать и возобновлять функции. Он использует yield from
возможностей, которые позволяют передавать данные назад и четверть от самого внутреннего генератора до самого внешнего. Он использует все те, чтобы остановить выполнение функции, пока он ждет завершения ввода-вывода (с помощью функции select
ОС).
И самое лучшее? В то время как одна функция приостановлена, другая может запускаться и чередоваться с тонкой тканью, которая является асинхронной.
Ответ 2
Говорить об async/await
и asyncio
- это не одно и то же. Первая - это фундаментальная низкоуровневая конструкция (сопрограммы), а в дальнейшем - библиотека, использующая эти конструкции. И наоборот, нет единого окончательного ответа.
Ниже приведено общее описание того, как работают библиотеки async/await
и asyncio
-like. То есть могут быть другие трюки сверху (есть...), но они несущественны, если вы сами их не создаете. Разница должна быть незначительной, если вы уже не знаете достаточно, чтобы не задавать такой вопрос.
1. Корутины против подпрограмм в ореховой оболочке
Подобно подпрограммам (функции, процедуры,...), сопрограммы (генераторы,...) являются абстракцией стека вызовов и указателя инструкции: существует стек выполнения частей кода, и каждый из них имеет определенную инструкцию.
Различие def
против async def
просто для ясности. Фактическая разница - это return
против yield
. Из этого await
или yield from
отличия от индивидуальных вызовов до целых стеков.
1.1. подпрограммы
Подпрограмма представляет собой новый уровень стека для хранения локальных переменных и единственный обход его инструкций для достижения цели. Рассмотрим такую подпрограмму:
def subfoo(bar):
qux = 3
return qux * bar
Когда вы запускаете это, это означает
- выделить пространство стека для
bar
и qux
- рекурсивно выполнить первый оператор и перейти к следующему утверждению
- один раз при
return
, вытащите его значение в вызывающий стек - очистить стек (1.) и указатель инструкции (2.)
Примечательно, что 4. означает, что подпрограмма всегда начинается в одном и том же состоянии. По завершении все эксклюзивное для самой функции теряется. Функция не может быть возобновлена, даже если есть инструкции после return
.
root -\
: \- subfoo --\
:/--<---return --/
|
V
1.2. Корутины как постоянные подпрограммы
Корутин похож на подпрограмму, но может выйти без разрушения ее состояния. Рассмотрим сопрограмму следующим образом:
def cofoo(bar):
qux = yield bar # yield marks a break point
return qux
Когда вы запускаете это, это означает
- выделить пространство стека для
bar
и qux
- рекурсивно выполнить первый оператор и перейти к следующему утверждению
- один раз с
yield
, толкает его значение в вызывающий стек, но сохраняет указатель стека и инструкции - после вызова в
yield
, восстановить указатель стека и инструкции и нажать аргументы qux
- один раз при
return
, вытащите его значение в вызывающий стек - очистить стек (1.) и указатель инструкции (2.)
Обратите внимание на добавление 2.1 и 2.2 - сопрограмма может быть приостановлена и возобновлена в заранее определенных точках. Это похоже на то, как подпрограмма приостановлена во время вызова другой подпрограммы. Разница в том, что активная сопрограмма строго не привязана к стеке вызовов. Вместо этого приостановленная сопрограмма является частью отдельного изолированного стека.
root -\
: \- cofoo --\
:/--<+--yield --/
| :
V :
Это означает, что приостановленные сопрограммы могут свободно храниться или перемещаться между стеками. Любой стек вызовов, имеющий доступ к сопрограмме, может решить возобновить ее.
1.3. Перемещение стека вызовов
До сих пор наша сопрограмма только снижала стек вызовов с yield
. Подпрограмма может спуститься вверх и вверх по стеку вызовов с return
и ()
. Для полноты сопрограммы также нуждаются в механизме для поднятия стека вызовов. Рассмотрим сопрограмму следующим образом:
def wrap():
yield 'before'
yield from cofoo()
yield 'after'
Когда вы запускаете его, это означает, что он по-прежнему выделяет указатель стека и инструкции как подпрограмму. Когда он приостанавливается, это все равно похоже на сохранение подпрограммы.
Однако yield from
делает. Он приостанавливает указатель стека и инструкции wrap
и запускает cofoo
. Обратите внимание, что wrap
остается приостановленным до cofoo
пор, пока cofoo
закончится cofoo
. Когда cofoo
приостанавливается или что-то отправляется, cofoo
напрямую подключается к вызывающему cofoo
.
1.4. Coroutines полностью вниз
Как установлено, yield from
позволяет подключать две области по другой промежуточной. При применении рекурсивно это означает, что верхняя часть стека может быть подключена к нижней части стека.
root -\
: \-> coro_a -yield-from-> coro_b --\
:/ <-+------------------------yield ---/
| :
:\ --+-- coro_a.send----------yield ---\
: coro_b <-/
Обратите внимание, что root
и coro_b
не знают друг о друге. Это делает сопрограммы намного чище, чем обратные вызовы: сопрограммы по-прежнему построены на соотношении 1:1, таких как подпрограммы. Coroutines приостанавливают и возобновляют весь существующий стек выполнения до обычной точки вызова.
Примечательно, что root
может иметь произвольное количество сопрограмм для возобновления. Тем не менее, он никогда не может возобновить больше одного одновременно. Корутины одного корня параллельны, но не параллельны!
1,5. Python async
и await
Объяснение до сих пор явно использовало yield
и yield from
словаря генераторов - базовая функциональность такая же. Новый синтаксис Python3.5 async
и await
основном для ясности.
def foo(): # subroutine?
return None
def foo(): # coroutine?
yield from foofoo() # generator? coroutine?
async def foo(): # coroutine!
await foofoo() # coroutine!
return None
async for
и async with
инструкциями необходимы, потому что вы нарушаете yield from/await
цепочку с with
инструкций bare и for
.
2. Анатомия простого цикла событий
Сама по себе сопрограмма не имеет понятия о том, чтобы дать контроль над другой сопрограммой. Это может привести только к управлению вызывающим абонентом в нижней части стека сопрограммы. Затем этот вызывающий абонент может переключиться на другую сопрограмму и запустить ее.
Этот корневой узел нескольких сопрограмм обычно представляет собой цикл событий: при приостановке сопрограммы дают событие, на которое он хочет возобновить. В свою очередь, цикл событий способен эффективно ждать появления этих событий. Это позволяет ему решить, какую сопрограмму выполнить дальше, или как подождать до возобновления.
Такая конструкция подразумевает, что существует набор заранее определенных событий, которые понимает цикл. Несколько сопрограмм await
друг друга, пока, наконец, не await
события. Это событие может напрямую связываться с циклом события, yield
управление.
loop -\
: \-> coroutine --await--> event --\
:/ <-+----------------------- yield --/
| :
| : # loop waits for event to happen
| :
:\ --+-- send(reply) -------- yield --\
: coroutine <--yield-- event <-/
Ключ состоит в том, что подвеска coroutine позволяет конвейеру событий и событиям напрямую взаимодействовать. Промежуточный стежок сопротектора не требует каких-либо знаний о том, какой цикл работает, и как работают события.
2.1.1. События во времени
Простейшее событие для обработки - это достижение момента времени. Это также основной блок потокового кода: поток многократно sleep
пока условие не будет истинным. Однако обычный sleep
блокирует выполнение сам по себе - мы хотим, чтобы другие сопрограммы не были заблокированы. Вместо этого мы хотим сообщить цикл события, когда он должен возобновить текущий стек сопрограммы.
2.1.2. Определение события
Событие - это просто значение, которое мы можем идентифицировать - будь то через перечисление, тип или другое удостоверение. Мы можем определить это с помощью простого класса, который хранит наше целевое время. Помимо хранения информации о событии, мы можем позволить await
класса напрямую.
class AsyncSleep:
"""Event to sleep until a point in time"""
def __init__(self, until: float):
self.until = until
# used whenever someone ''await' an instance of this Event
def __await__(self):
# yield this Event to the loop
yield self
def __repr__(self):
return '%s(until=%.1f)' % (self.__class__.__name__, self.until)
Этот класс сохраняет только событие - он не говорит, как его обрабатывать.
Единственная особенность - __await__
- это то, что ищет ключевое слово await
. Практически это итератор, но не доступен для обычного итерационного механизма.
2.2.1. Ожидание события
Теперь, когда у нас есть событие, как реагируют на него сопрограммы? Мы должны быть в состоянии выразить эквивалент sleep
, await
нашего события. Чтобы лучше понять, что происходит, мы ждем дважды в течение половины времени:
import time
async def asleep(duration: float):
"""await that ''duration'' seconds pass"""
await AsyncSleep(time.time() + duration / 2)
await AsyncSleep(time.time() + duration / 2)
Мы можем напрямую создать экземпляр и запустить эту сопрограмму. Подобно генератору, использование coroutine.send
запускает сопрограмму, пока не yield
результат.
coroutine = asleep(100)
while True:
print(coroutine.send(None))
time.sleep(0.1)
Это дает нам два события AsyncSleep
а затем StopIteration
когда выполняется сопрограмма. Обратите внимание, что единственная задержка - от time.sleep
в цикле! Каждый AsyncSleep
сохраняет только смещение от текущего времени.
2.2.2. Событие + Сон
На данный момент у нас есть два отдельных механизма:
- События
AsyncSleep
которые могут быть получены изнутри сопрограммы -
time.sleep
который может ждать, не влияя на сопрограммы
Примечательно, что эти два ортогональны: ни один из них не влияет или не вызывает другое. В результате мы можем разработать собственную стратегию sleep
чтобы удовлетворить задержку AsyncSleep
.
2,3. Наивный цикл событий
Если у нас есть несколько сопрограмм, каждый может рассказать нам, когда хочет проснуться. Затем мы можем подождать, пока первый из них не захочет возобновиться, а затем после него и так далее. Примечательно, что в каждом пункте мы заботимся только о том, какой из них следующий.
Это упрощает планирование:
- сортировать сопрограммы по их желаемому времени пробуждения
- выберите первый, который хочет проснуться
- подождать до этого момента
- запустить эту сопрограмму
- повторить с 1.
Тривиальная реализация не нуждается в каких-либо передовых концепциях. list
позволяет сортировать сопрограммы по дате. Ожидание - обычное time.sleep
. Выполнение сопрограмм работает так же, как и раньше, с coroutine.send
.
def run(*coroutines):
"""Cooperatively run all ''coroutines'' until completion"""
# store wake-up-time and coroutines
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting:
# 2. pick the first coroutine that wants to wake up
until, coroutine = waiting.pop(0)
# 3. wait until this point in time
time.sleep(max(0.0, until - time.time()))
# 4. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
Конечно, у этого есть достаточно места для улучшения. Мы можем использовать кучу для очереди ожидания или таблицы отправки событий. Мы также могли бы получить возвращаемые значения из StopIteration
и назначить их сопрограмме. Однако основной принцип остается неизменным.
2,4. Совместное ожидание
Событие AsyncSleep
и run
событий выполнения - это полностью работающая реализация событий времени.
async def sleepy(identifier: str = "coroutine", count=5):
for i in range(count):
print(identifier, 'step', i + 1, 'at %.2f' % time.time())
await asleep(0.1)
run(*(sleepy("coroutine %d" % j) for j in range(5)))
Это взаимодействует между каждым из пяти сопрограмм, каждый из которых приостанавливается на 0,1 секунды. Несмотря на то, что цикл событий является синхронным, он по-прежнему выполняет работу за 0,5 секунды вместо 2,5 секунд. Каждая сопрограмма держится и действует независимо.
3. Цикл событий ввода-вывода
Цикл событий, поддерживающий sleep
, подходит для опроса. Однако ожидание ввода/вывода в дескрипторе файла может быть выполнено более эффективно: операционная система реализует ввод-вывод и, следовательно, знает, какие дескрипторы готовы. В идеале цикл событий должен поддерживать явное событие "готово для ввода-вывода".
3.1. select
вызов
Python уже имеет интерфейс для запроса ОС для чтения операций ввода-вывода. Когда вызывается с помощью ручек для чтения или записи, он возвращает дескрипторы, готовые для чтения или записи:
readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)
Например, мы можем open
файл для записи и дождаться его готовности:
write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])
После выбора возвращает, writeable
содержит наш открытый файл.
3.2. Основное событие ввода-вывода
Подобно запросу AsyncSleep
, нам нужно определить событие для ввода-вывода. С базовой логикой select
событие должно ссылаться на читаемый объект - например, open
файл. Кроме того, мы храним, сколько данных читать.
class AsyncRead:
def __init__(self, file, amount=1):
self.file = file
self.amount = amount
self._buffer = ''
def __await__(self):
while len(self._buffer) < self.amount:
yield self
# we only get here if ''read'' should not block
self._buffer += self.file.read(1)
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.file, self.amount, len(self._buffer)
)
Как и в случае с AsyncSleep
мы в основном просто сохраняем данные, необходимые для базового системного вызова. На этот раз __await__
может возобновляться несколько раз - пока наша желаемая amount
не будет прочитана. Кроме того, мы return
результат ввода-вывода вместо возобновления.
3.3. Увеличение цикла событий с чтением ввода-вывода
Основой для нашего цикла событий по-прежнему является run
определенный ранее. Во-первых, нам нужно отслеживать запросы на чтение. Это уже не отсортированное расписание, мы только сопоставляем запросы на чтение с сопрограммами.
# new
waiting_read = {} # type: Dict[file, coroutine]
Поскольку select.select
принимает параметр тайм-аута, мы можем использовать его вместо time.sleep
.
# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])
Это дает нам все читаемые файлы - если они есть, мы запускаем соответствующую сопрограмму. Если их нет, мы достаточно долго ждали, пока наша текущая сопрограмма будет запущена.
# new - reschedule waiting coroutine, run readable coroutine
if readable:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read[readable[0]]
Наконец, мы должны слушать запросы на чтение.
# new
if isinstance(command, AsyncSleep):
...
elif isinstance(command, AsyncRead):
...
3.4. Объединение
Выше было немного упрощение. Нам нужно сделать переход, чтобы не голодать спальные сопрограммы, если мы всегда можем читать. Нам нужно справляться с тем, чтобы не читать или ничего не ждать. Однако конечный результат все же помещается в 30 LOC.
def run(*coroutines):
"""Cooperatively run all ''coroutines'' until completion"""
waiting_read = {} # type: Dict[file, coroutine]
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting or waiting_read:
# 2. wait until the next coroutine may run or read ...
try:
until, coroutine = waiting.pop(0)
except IndexError:
until, coroutine = float('inf'), None
readable, _, _ = select.select(list(waiting_read), [], [])
else:
readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
# ... and select the appropriate one
if readable and time.time() < until:
if until and coroutine:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read.pop(readable[0])
# 3. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension ...
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
# ... or register reads
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
3.5. Кооперативный ввод-вывод
Реализации AsyncSleep
, AsyncRead
и run
теперь полностью функциональны для сна и/или чтения. То же, что и для sleepy
, мы можем определить помощника для проверки чтения:
async def ready(path, amount=1024*32):
print('read', path, 'at', '%d' % time.time())
with open(path, 'rb') as file:
result = return await AsyncRead(file, amount)
print('done', path, 'at', '%d' % time.time())
print('got', len(result), 'B')
run(sleepy('background', 5), ready('/dev/urandom'))
Запустив это, мы видим, что наш ввод-вывод чередуется с ожидающей задачей:
id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B
4. Неблокирующий ввод-вывод
В то время как asyncio
ввода/вывода в файлах имеют концепцию, она не подходит для библиотеки, такой как asyncio
: вызов select
всегда возвращается для файлов, а оба open
и read
могут блокироваться неограниченно долго. Это блокирует все сопрограммы цикла событий - что плохо. Библиотеки, такие как aiofiles
используют потоки и синхронизацию для подделки неблокирующих aiofiles
ввода-вывода и событий в файле.
Однако сокеты позволяют неблокировать ввод-вывод - и их присущая задержка делает ее гораздо более критичной. При использовании в цикле событий ожидание данных и повторная попытка могут быть завернуты без блокировки.
4.1. Неблокирующее событие ввода-вывода
Подобно нашему AsyncRead
, мы можем определить событие suspend-and-read для сокетов. Вместо того, чтобы брать файл, мы берем сокет, который должен быть неблокирующим. Кроме того, наш __await__
использует socket.recv
вместо file.read
.
class AsyncRecv:
def __init__(self, connection, amount=1, read_buffer=1024):
assert not connection.getblocking(), 'connection must be non-blocking for async recv'
self.connection = connection
self.amount = amount
self.read_buffer = read_buffer
self._buffer = b''
def __await__(self):
while len(self._buffer) < self.amount:
try:
self._buffer += self.connection.recv(self.read_buffer)
except BlockingIOError:
yield self
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.connection, self.amount, len(self._buffer)
)
В отличие от AsyncRead
, __await__
выполняет действительно неблокирующий ввод-вывод. Когда данные доступны, он всегда читает. Когда данных нет, он всегда приостанавливается. Это означает, что цикл событий блокируется только при выполнении полезной работы.
4.2. Un-Blocking цикл события
Что касается цикла событий, ничего не меняется. Событие для прослушивания по-прежнему такое же, как и для файлов - дескриптор файла, помеченный готовым по select
.
# old
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
waiting_read[command.connection] = coroutine
На этом этапе должно быть очевидно, что AsyncRead
и AsyncRecv
являются AsyncRead
и тем же событием. Мы могли бы легко реорганизовать их как одно событие с заменяемым компонентом ввода-вывода. По сути, цикл событий, сопрограммы и события четко разделяют планировщик, произвольный промежуточный код и фактический ввод-вывод.
4,3. Уродливая сторона неблокирующего ввода-вывода
В принципе, то, что вы должны сделать в этот момент, - это повторить логику read
как recv
для AsyncRecv
. Тем не менее, теперь это гораздо более уродливо - вам нужно обрабатывать ранние возвращения, когда функции блокируются внутри ядра, но уступят вам контроль. Например, открытие соединения по сравнению с открытием файла намного дольше:
# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
connection.connect((url, port))
except BlockingIOError:
pass
Короче говоря, осталось несколько десятков строк обработки исключений. На этом этапе уже работают события и цикл событий.
id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5
добавление
Пример кода в github