Генератор общего питона
Я пытаюсь воспроизвести реактивные расширения "разделяемой" наблюдаемой концепции с генераторами Python.
Скажем, у меня есть API, который дает мне бесконечный поток, который я могу использовать так:
def my_generator():
for elem in the_infinite_stream():
yield elem
Я мог бы использовать этот генератор несколько раз так:
stream1 = my_generator()
stream2 = my_generator()
И the_infinite_stream()
будет вызываться дважды (один раз для каждого генератора).
Теперь скажите, что the_infinite_stream()
- дорогостоящая операция. Есть ли способ "разделить" генератор между несколькими клиентами? Похоже, что тройник бы сделать это, но я должен заранее знать, как много независимых генераторов я хочу.
Идея состоит в том, что в других языках (Java, Swift), использующих реактивные расширения (RxJava, RxSwift) "разделяемые" потоки, я могу удобно дублировать поток на стороне клиента. Мне интересно, как это сделать в Python.
Примечание: я использую asyncio
Ответы
Ответ 1
Я взял tee
реализацию и модифицировано это такое вы можете иметь различное число генераторов от infinite_stream
:
import collections
def generators_factory(iterable):
it = iter(iterable)
deques = []
already_gone = []
def new_generator():
new_deque = collections.deque()
new_deque.extend(already_gone)
deques.append(new_deque)
def gen(mydeque):
while True:
if not mydeque: # when the local deque is empty
newval = next(it) # fetch a new value and
already_gone.append(newval)
for d in deques: # load it to all the deques
d.append(newval)
yield mydeque.popleft()
return gen(new_deque)
return new_generator
# test it:
infinite_stream = [1, 2, 3, 4, 5]
factory = generators_factory(infinite_stream)
gen1 = factory()
gen2 = factory()
print(next(gen1)) # 1
print(next(gen2)) # 1 even after it was produced by gen1
print(list(gen1)) # [2, 3, 4, 5] # the rest after 1
Чтобы кэшировать только некоторое количество значений, вы можете изменить already_gone = []
на already_gone = collections.deque(maxlen=size)
и добавить параметр size=None
в generators_factory
.
Ответ 2
Рассмотрим простые атрибуты класса.
Дано
def infinite_stream():
"""Yield a number from a (semi-)infinite iterator."""
# Alternatively, 'yield from itertools.count()'
yield from iter(range(100000000))
# Helper
def get_data(iterable):
"""Print the state of 'data' per stream."""
return ", ".join([f"{x.__name__}: {x.data}" for x in iterable])
Код
class SharedIterator:
"""Share the state of an iterator with subclasses."""
_gen = infinite_stream()
data = None
@staticmethod
def modify():
"""Advance the shared iterator + assign new data."""
cls = SharedIterator
cls.data = next(cls._gen)
демонстрация
Учитывая кортеж клиентских streams
(A
, B
и C
),
# Streams
class A(SharedIterator): pass
class B(SharedIterator): pass
class C(SharedIterator): pass
streams = A, B, C
давайте изменим и распечатаем состояние одного итератора, совместно используемого ими:
# Observe changed state in subclasses
A.modify()
print("1st access:", get_data(streams))
B.modify()
print("2nd access:", get_data(streams))
C.modify()
print("3rd access:", get_data(streams))
Выход
1st access: A: 0, B: 0, C: 0
2nd access: A: 1, B: 1, C: 1
3rd access: A: 2, B: 2, C: 2
Хотя любой поток может изменить итератор, атрибут класса является общим для подклассов.
Смотрите также
- Документы на
asyncio.Queue
- асинхронная альтернатива общему контейнеру - Опубликовать в Обозревателе Pattern +
asyncio
Ответ 3
Вы можете повторно вызывать "tee" для создания нескольких итераторов по мере необходимости.
it = iter([ random.random() for i in range(100)])
base, it_cp = itertools.tee(it)
_, it_cp2 = itertools.tee(base)
_, it_cp3 = itertools.tee(base)
Образец: http://tpcg.io/ZGc6l5.
Ответ 4
Вы можете использовать один генератор и "генераторы подписчика":
subscribed_generators = []
def my_generator():
while true:
elem = yield
do_something(elem) # or yield do_something(elem) depending on your actual use
def publishing_generator():
for elem in the_infinite_stream():
for generator in subscribed_generators:
generator.send(elem)
subscribed_generators.extend([my_generator(), my_generator()])
# Next is just ane example that forces iteration over 'the_infinite_stream'
for elem in publishing_generator():
pass
Вместо функции генератора вы можете также создать класс с методами: __next__
, __iter__
, send
, throw
. Таким образом, вы можете изменить MyGenerator.__init__
чтобы автоматически добавлять его новые экземпляры в subscribed_generators
.
Это несколько похоже на основанный на событиях подход с "тупой реализацией":
-
for elem in the_infinite_stream
похож на испускающее событие -
for generator...: generator.send
аналогично отправке события каждому подписчику.
Поэтому одним из способов реализации "более сложного, но структурированного решения" было бы использование подхода, основанного на событиях:
- Например, вы можете использовать asyncio.Event
- Или какое-то стороннее решение вроде aiopubsub
- Для любого из этих подходов вы должны
the_infinite_stream
событие для каждого элемента из the_infinite_stream
, и ваши экземпляры my_generator
должны быть подписаны на эти события.
Также можно использовать другие подходы, и лучший выбор зависит от деталей вашей задачи, от того, как вы используете цикл обработки событий в asyncio. Например:
-
Вы можете реализовать the_infinite_stream
(или оболочку для него) как некоторый класс с "курсорами" (объекты, которые отслеживают текущую позицию в потоке для разных подписчиков); затем каждый my_generator
регистрирует новый курсор и использует его для получения следующего элемента в бесконечном потоке. В этом подходе цикл обработки событий не будет автоматически пересматривать экземпляры my_generator
, которые могут потребоваться, если эти экземпляры "не равны" (например, имеют некоторую "балансировку приоритетов")
-
Промежуточный генератор, вызывающий все экземпляры my_generator
(как описано ранее). При таком подходе каждый экземпляр my_generator
автоматически возвращается в цикл обработки событий. Скорее всего, этот подход является потокобезопасным.
-
Событийные подходы:
-
используя asyncio.Event
. Аналогично использованию промежуточного генератора. Не потокобезопасный
-
aiopubsub.
-
то, что использует шаблон наблюдателя
-
Сделайте the_infinite_generator
(или оболочку для него) как "Singleton", который "кэширует" последнее событие. Некоторые подходы были описаны в других ответах. Можно использовать и другие "кеширующие" решения:
-
the_infinite_generator
один и тот же элемент один раз для каждого экземпляра the_infinite_generator
(используйте класс с пользовательским методом __new__
который отслеживает экземпляры, или используйте тот же экземпляр класса, у которого есть метод, возвращающий "сдвинутый" итератор по сравнению с the_infinite_loop
), пока кто-то не the_infinite_loop
специальный метод для экземпляра the_infinite_generator
( или в классе): infinite_gen.next_cycle
. В этом случае всегда должен быть какой-то "последний завершающий генератор/процессор", который в конце каждого цикла цикла обработки событий будет выполнять the_infinite_generator().next_cycle()
-
Подобно предыдущему, но одно и то же событие разрешено my_generator
несколько раз в одном и том же экземпляре my_generator
(поэтому они должны следить за этим случаем). В этом подходе the_infinite_generator().next_cycle()
может вызываться "периодически" с помощью loop.call_later или loop.cal_at. Этот подход может быть необходим, если "подписчики" должны быть в состоянии обрабатывать/анализировать: задержки, ограничения скорости, тайм-ауты между событиями и т.д.
-
Возможно множество других решений. Трудно предложить что-то конкретное, не глядя на вашу текущую реализацию и не зная, каково желаемое поведение генераторов, которые используют the_infinite_loop
Если я правильно понимаю ваше описание "общих" потоков, то вам действительно нужен "один" генератор the_infinite_stream
и "обработчик" для него. Пример, который пытается сделать это:
class StreamHandler:
def __init__(self):
self.__real_stream = the_infinite_stream()
self.__sub_streams = []
def get_stream(self):
sub_stream = [] # or better use some Queue/deque object. Using list just to show base principle
self.__sub_streams.append(sub_stream)
while True:
while sub_stream:
yield sub_stream.pop(0)
next(self)
def __next__(self):
next_item = next(self.__real_stream)
for sub_stream in self.__sub_steams:
sub_stream.append(next_item)
some_global_variable = StreamHandler()
# Or you can change StreamHandler.__new__ to make it singleton, or you can create an instance at the point of creation of event-loop
def my_generator():
for elem in some_global_variable.get_stream():
yield elem
Но если все ваши объекты my_generator
инициализируются в одной и той же точке бесконечного потока и "одинаково" повторяются внутри цикла, тогда этот подход привнесет "ненужные" накладные расходы памяти для каждого "sub_stream" (используется как очередь). Нет необходимости: потому что эти очереди всегда будут одинаковыми (но это можно оптимизировать: если есть какой-то существующий "пустой" подпоток, то он может быть повторно использован для новых подпотоков с некоторыми изменениями в " pop
-logic"). И много-много других реализаций и нюансов можно обсудить
Ответ 5
Если вам не нужно кэшировать значения, я думаю, что это должно сработать:
from contextlib import contextmanager
@contextmanager
def read_multiple(*args, **kwargs):
f = open(*args, **kwargs)
class Iterator:
def __iter__(self):
yield from f
f.seek(0)
yield Iterator()
f.close()