Ответ 1
Давайте рассмотрим, как применить две совокупные функции к одному и тому же итератору, которые мы можем только исчерпать. Первоначальная попытка (sum
жестких кодов и max
для краткости, но тривиально обобщаемая для произвольного числа агрегатных функций) может выглядеть так:
def max_and_sum_buffer(it):
content = list(it)
p = sum(content)
m = max(content)
return p, m
Эта реализация имеет недостаток, что она хранит сразу все сгенерированные элементы в памяти, несмотря на то, что обе функции отлично способны обрабатывать потоки. Вопрос предвосхищает этот отказ и явно запрашивает результат, который будет создан без буферизации вывода итератора. Можно ли сделать это?
Последовательное исполнение: itertools.tee
Это, безусловно, кажется возможным. В конце концов, итераторы Python являются внешними, поэтому каждый итератор уже способен приостановить себя. Насколько сложно обеспечить адаптер, который разбивает итератор на два новых итератора, которые предоставляют один и тот же контент? В самом деле, это точно описание itertools.tee
, которое отлично подходит для параллельной итерации:
def max_and_sum_tee(it):
it1, it2 = itertools.tee(it)
p = sum(it1) # XXX
m = max(it2)
return p, m
Вышеприведенный результат дает правильный результат, но не работает так, как нам бы хотелось. Проблема в том, что мы не повторяемся параллельно. Совокупные функции, такие как sum
и max
никогда не приостанавливаются - каждый из них настаивает на том, чтобы потреблять все содержимое итератора перед получением результата. Таким образом, sum
будет исчерпать it1
до того, как у max
был шанс запустить вообще. Исчерпывающие элементы it1
при выходе из it2
будут заставлять эти элементы накапливаться внутри внутреннего FIFO, разделяемого между двумя итераторами. Это неизбежно здесь - поскольку max(it2)
должен видеть одни и те же элементы, у tee
нет выбора, кроме как накапливать их. (Более подробную информацию о tee
см. В этом сообщении.)
Другими словами, нет никакой разницы между этой реализацией и первой, за исключением того, что первая, по крайней мере, делает буферизацию явной. Чтобы исключить буферизацию, sum
и max
должны выполняться параллельно, а не один за другим.
Темы: concurrent.futures
Посмотрим, что произойдет, если мы запустим агрегированные функции в отдельных потоках, все еще используем tee
для дублирования исходного итератора:
def max_and_sum_threads_simple(it):
it1, it2 = itertools.tee(it)
with concurrent.futures.ThreadPoolExecutor(2) as executor:
sum_future = executor.submit(lambda: sum(it1))
max_future = executor.submit(lambda: max(it2))
return sum_future.result(), max_future.result()
Теперь sum
и max
фактически выполняются параллельно (насколько позволяет GIL), причем потоки управляются отличным модулем concurrent.futures
. Он имеет фатальный недостаток, однако: для tee
не для буферизации данных, sum
и max
должны обрабатывать их детали точно такой же скоростью. Если он хоть немного быстрее, чем другой, они будут дрейфовать друг от друга, а tee
будет буферизовать все промежуточные элементы. Поскольку нет способа предсказать, как быстро каждый из них будет работать, количество буферизации является одновременно непредсказуемым и имеет неприятный худший случай буферизации всего.
Чтобы гарантировать отсутствие буферизации, tee
должен быть заменен специальным генератором, который ничего не блокирует и блокирует, пока все потребители не будут наблюдать предыдущее значение, прежде чем перейти к следующему. Как и раньше, каждый потребитель работает в своем собственном потоке, но теперь вызывающий поток занят запуском производителя, цикл, который фактически выполняет итерацию над исходным итератором и сигнализирует о наличии нового значения. Вот реализация:
def max_and_sum_threads(it):
STOP = object()
next_val = None
consumed = threading.Barrier(2 + 1) # 2 consumers + 1 producer
val_id = 0
got_val = threading.Condition()
def send(val):
nonlocal next_val, val_id
consumed.wait()
with got_val:
next_val = val
val_id += 1
got_val.notify_all()
def produce():
for elem in it:
send(elem)
send(STOP)
def consume():
last_val_id = -1
while True:
consumed.wait()
with got_val:
got_val.wait_for(lambda: val_id != last_val_id)
if next_val is STOP:
return
yield next_val
last_val_id = val_id
with concurrent.futures.ThreadPoolExecutor(2) as executor:
sum_future = executor.submit(lambda: sum(consume()))
max_future = executor.submit(lambda: max(consume()))
produce()
return sum_future.result(), max_future.result()
Это довольно какой-то код для чего-то столь прост концептуально, но он необходим для правильной работы.
produce()
циклы поверх внешнего итератора и отправляет предметы потребителям, по одному значению за раз. Он использует барьер, удобный примитив синхронизации, добавленный в Python 3.2, чтобы ждать, пока все потребители будут сделаны со старым значением, прежде чем перезаписывать его новым в next_val
. Когда новое значение действительно готово, состояние передается. consume()
- это генератор, который передает полученные значения по мере их поступления, до обнаружения STOP
. Кодекс может быть обобщен, запуская любое количество агрегатных функций параллельно, создавая потребителей в цикле и корректируя их количество при создании барьера.
Недостатком этой реализации является то, что она требует создания потоков (возможно, облегчается путем создания пула потоков в глобальном масштабе) и очень тщательной синхронизации на каждом итерационном проходе. Эта синхронизация разрушает производительность - эта версия почти в 2000 раз медленнее, чем однопоточный tee
, и в 475 раз медленнее простой, но не детерминированной версии с резьбой.
Тем не менее, до тех пор, пока используются потоки, в какой-то форме не следует избегать синхронизации. Чтобы полностью исключить синхронизацию, мы должны отказаться от потоков и переключиться на совместную многозадачность. Вопрос в том, можно ли приостановить выполнение обычных синхронных функций, таких как sum
и max
, чтобы переключаться между ними?
Волокна: зеленая
Оказывается, что greenlet
модуль расширения сторонних позволяет точно это. Greenlets - это реализация волокон, легкие микропотоки, которые явно переключаются между собой. Это своего рода генераторы Python, которые используют yield
для приостановки, за исключением того, что зелья предлагают гораздо более гибкий механизм подвески, позволяющий выбрать, кому приостановить действие.
Это позволяет легко max_and_sum
версию max_and_sum
на max_and_sum
:
def max_and_sum_greenlet(it):
STOP = object()
consumers = None
def send(val):
for g in consumers:
g.switch(val)
def produce():
for elem in it:
send(elem)
send(STOP)
def consume():
g_produce = greenlet.getcurrent().parent
while True:
val = g_produce.switch()
if val is STOP:
return
yield val
sum_result = []
max_result = []
gsum = greenlet.greenlet(lambda: sum_result.append(sum(consume())))
gsum.switch()
gmax = greenlet.greenlet(lambda: max_result.append(max(consume())))
gmax.switch()
consumers = (gsum, gmax)
produce()
return sum_result[0], max_result[0]
Логика такая же, но с меньшим количеством кода. Как и раньше, produce
производит значения, полученные из исходного итератора, но его send
не беспокоит синхронизацию, так как это не обязательно, когда все однопоточное. Вместо этого он явно переключается на каждого потребителя, в свою очередь, чтобы сделать свое дело, а потребитель добросовестно переключается обратно. Пройдя через всех потребителей, производитель готов к следующему переходу.
Результаты извлекаются с использованием промежуточного списка из одного элемента, поскольку greenlet не предоставляет доступ к возвращаемому значению целевой функции (и также threading.Thread
, поэтому мы выбрали для concurrent.futures
выше).
Тем не менее, есть минусы использования зеленых. Во-первых, они не поставляются со стандартной библиотекой, вам нужно установить расширение greenlet. Тогда, greenlet по своей сути не переносится, потому что код переключения стека не поддерживается ОС и компилятором и может рассматриваться как хак (хотя и очень умный). Ориентация Python на WebAssembly или JVM или GraalVM вряд ли будет поддерживать greenlet. Это не настоящая проблема, но это определенно нужно иметь в виду надолго.
Корутинцы: асинчио
Начиная с Python 3.5, Python предоставляет собственные сопрограммы. В отличие от окунь и аналогичные генераторам, сопрограммы отличаются от обычных функций и должны определяться с помощью async def
. Coroutines не могут быть легко выполнены из синхронного кода, они должны обрабатываться планировщиком, который доводит их до завершения. Планировщик также известен как цикл событий, поскольку его другое задание - получать события ввода-вывода и передавать их соответствующим обратным вызовам и сопрограммным сообщениям. В стандартной библиотеке это роль asyncio
модуля.
Перед внедрением max_and_sum на основе max_and_sum
мы должны сначала разрешить препятствие. В отличие от greenlet, asyncio может только приостановить выполнение сопрограмм, а не произвольных функций. Поэтому нам нужно заменить sum
и max
сопрограммами, которые делают по сути одно и то же. Это так же просто, как реализовать их очевидным образом, только заменяя for
async for
, позволяя асинхронному итератору приостанавливать сопрограмму в ожидании следующего значения:
async def asum(it):
s = 0
async for elem in it:
s += elem
return s
async def amax(it):
NONE_YET = object()
largest = NONE_YET
async for elem in it:
if largest is NONE_YET or elem > largest:
largest = elem
if largest is NONE_YET:
raise ValueError("amax() arg is an empty sequence")
return largest
# or, using https://github.com/vxgmichel/aiostream
#
#from aiostream.stream import accumulate
#def asum(it):
# return accumulate(it, initializer=0)
#def amax(it):
# return accumulate(it, max)
Можно было бы разумно спросить, не обманывает ли создание новой пары совокупных функций; в конце концов, предыдущие решения были осторожны, чтобы использовать существующие встроенные значения sum
и max
. Ответ будет зависеть от точной интерпретации вопроса, но я бы сказал, что новые функции разрешены, потому что они никоим образом не являются специфическими для данной задачи. Они делают то же самое, что делают встроенные модули, но потребляют асинхронные итераторы. Я подозреваю, что единственная причина, по которой такие функции еще не существуют где-то в стандартной библиотеке, объясняется тем, что сопрограммы и асинхронные итераторы являются относительно новой.
С этой точки зрения, мы можем перейти к написанию max_and_sum
в качестве сопрограммы:
async def max_and_sum_asyncio(it):
loop = asyncio.get_event_loop()
STOP = object()
next_val = loop.create_future()
consumed = loop.create_future()
used_cnt = 2 # number of consumers
async def produce():
for elem in it:
next_val.set_result(elem)
await consumed
next_val.set_result(STOP)
async def consume():
nonlocal next_val, consumed, used_cnt
while True:
val = await next_val
if val is STOP:
return
yield val
used_cnt -= 1
if not used_cnt:
consumed.set_result(None)
consumed = loop.create_future()
next_val = loop.create_future()
used_cnt = 2
else:
await consumed
s, m, _ = await asyncio.gather(asum(consume()), amax(consume()),
produce())
return s, m
Хотя эта версия основана на переключении между сопрограммами внутри одного потока, точно так же, как тот, который использует зелень, выглядит иначе. asyncio не обеспечивает явное переключение сопрограмм, он заставляет задачу переключиться на await
приостановку/возобновление примитива. Целью await
может быть другая сопрограмма, но также абстрактное "будущее", местозаполнитель значений, который будет заполнен позже другой сопрограммой. Как только ожидаемое значение становится доступным, цикл события автоматически возобновляет выполнение сопрограммы, при этом выражение await
вычисляется до предоставленного значения. Поэтому вместо того, чтобы produce
переход к потребителям, он приостанавливается, ожидая будущего, которое прибудет, как только все потребители будут наблюдать за производимой стоимостью.
consume()
- это асинхронный генератор, который похож на обычный генератор, за исключением того, что он создает асинхронный итератор, который наши агрегированные сопрограммы уже готовы принять с помощью async for
. __next__
итератор, эквивалентный __next__
, называется __anext__
и является сопрограммой, позволяющей сопрограмме, которая исчерпывает асинхронный итератор, приостанавливаться, ожидая появления нового значения. Когда работающий асинхронный генератор приостанавливается на await
, это наблюдается async for
как приостановка неявного вызова __anext__
. consume()
делает именно это, когда он ждет значений, предоставляемых produce
и, по мере их появления, передает их в совокупные сопрограммы, такие как asum
и amax
. Ожидание реализуется с помощью next_val
будущего, которое несет в себе следующий элемент из it
. Ожидая, что будущее внутри consume()
приостанавливает генератор асинхронных сигналов, а вместе с ним и сводную сопрограмму.
Преимущество этого подхода по сравнению с явным переключением зеленых состоит в том, что он значительно упрощает объединение сопрограмм, которые не знают друг о друге в один и тот же цикл событий. Например, можно было max_and_sum
запустить два экземпляра max_and_sum
параллельно (в том же потоке) или запустить более сложную агрегированную функцию, которая вызывала бы дополнительный асинхронный код для выполнения вычислений.
Следующая функция удобства показывает, как выполнить вышеуказанное из кода, отличного от asyncio:
def max_and_sum_asyncio_sync(it):
# trivially instantiate the coroutine and execute it in the
# default event loop
coro = max_and_sum_asyncio(it)
return asyncio.get_event_loop().run_until_complete(coro)
Спектакль
Измерение и сравнение производительности этих подходов к параллельному исполнению может вводить в заблуждение, поскольку sum
и max
практически не обрабатываются, что чрезмерно подчеркивает накладные расходы при распараллеливании. Относитесь к ним так же, как к любым микробизнесам с большим количеством соли. Сказав это, пусть все равно посмотрит на цифры!
Измерения производились с использованием Python 3.6. Функции выполнялись только один раз и заданный range(10000)
, их время измерялось путем вычитания time.time()
до и после выполнения. Вот результаты:
-
max_and_sum_buffer
иmax_and_sum_tee
: 0,66 мс - почти то же самое время для обоих, причем версияtee
немного быстрее. -
max_and_sum_threads_simple
: 2,7 мс. Это время очень мало связано с недетерминированной буферизацией, поэтому это может быть измерение времени для запуска двух потоков и синхронизации, выполняемой Python. -
max_and_sum_threads
: 1,29 секунды, безусловно самый медленный вариант, ~ 2000 раз медленнее, чем самый быстрый. Этот ужасный результат, вероятно, вызван комбинацией множественных синхронизаций, выполняемых на каждом этапе итерации и их взаимодействием с GIL. -
max_and_sum_greenlet
: 25,5 мс, медленный по сравнению с исходной версией, но намного быстрее, чем потоковая версия. С достаточно сложной совокупной функцией можно представить, используя эту версию в производстве. -
max_and_sum_asyncio
: 351 мс, почти в 14 раз медленнее, чем версия greenlet. Это неутешительный результат, потому что асинхронные сопрограммы более легкие, чем зеленые, и переключение между ними должно быть намного быстрее, чем переключение между волокнами. Вполне вероятно, что накладные расходы на запуск планировщика сопроцессора и цикла событий (который в этом случае является избыточным, учитывая, что код не делает IO) разрушает производительность на этом микро-контроле. -
max_and_sum_asyncio
с использованиемuvloop
: 125 мс. Это более чем в два раза превышает скорость обычного асинхронного, но все же почти в 5 раз медленнее, чем зелень.
Запуск примеров под PyPy не приносит значительного ускорения, на самом деле большинство примеров работают немного медленнее, даже после их запуска несколько раз, чтобы обеспечить прогрев JIT. Функция asyncio требует переписать, чтобы не использовать генераторы async (поскольку PyPy на этой записи реализует Python 3.5) и выполняет несколько менее 100 мс. Это сопоставимо с производительностью CPython + uvloop, то есть лучше, но не драматично по сравнению с greenlet.