Включить функции с обратным вызовом в генераторы Python?
Функция минимизации Scipy (просто для использования в качестве примера) имеет возможность добавления функции обратного вызова на каждом шаге. Поэтому я могу сделать что-то вроде
def my_callback(x):
print x
scipy.optimize.fmin(func, x0, callback=my_callback)
Есть ли способ использовать функцию обратного вызова для создания версии генератора fmin, чтобы я мог это сделать,
for x in my_fmin(func,x0):
print x
Кажется, что это возможно с некоторой комбинацией доходностей и отправлений, но я могу все придумать.
Ответы
Ответ 1
Как указано в комментариях, вы можете сделать это в новом потоке, используя Queue
. Недостатком является то, что вам все равно нужно каким-то образом получить доступ к окончательному результату (что fmin
возвращает в конце). В моем примере ниже используется дополнительный обратный вызов, чтобы что-то с ним делать (другим вариантом было бы просто дать его, хотя ваш код вызова должен был бы различать результаты итерации конечных результатов):
from thread import start_new_thread
from Queue import Queue
def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None):
q = Queue() # fmin produces, the generator consumes
job_done = object() # signals the processing is done
# Producer
def my_callback(x):
q.put(x)
def task():
ret = scipy.optimize.fmin(func,x0,callback=my_callback)
q.put(job_done)
end_callback(ret) # "Returns" the result of the main call
# Starts fmin in a new thread
start_new_thread(task,())
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
if next_item is job_done:
break
yield next_item
Обновление:, чтобы заблокировать выполнение следующей итерации до тех пор, пока потребитель не завершит обработку последнего, также необходимо использовать task_done
и join
.
# Producer
def my_callback(x):
q.put(x)
q.join() # Blocks until task_done is called
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
if next_item is job_done:
break
yield next_item
q.task_done() # Unblocks the producer, so a new iteration can start
Обратите внимание, что maxsize=1
не требуется, так как новый элемент не будет добавлен в очередь до тех пор, пока последний не будет использован.
Обновление 2: Также обратите внимание, что, если все элементы в конечном итоге не будут получены этим генератором, созданный поток будет блокирован (он будет блокироваться навсегда, и его ресурсы никогда не будут выпущены). Производитель ждет в очереди, и поскольку он хранит ссылку на эту очередь, он никогда не будет возвращен gc, даже если потребитель. Затем очередь станет недоступной, поэтому никто не сможет освободить блокировку.
Чистое решение для этого неизвестно, если возможно вообще (поскольку оно будет зависеть от конкретной функции, используемой в месте fmin
). Обходной путь можно сделать с помощью timeout
, если производитель вызывает исключение, если put
блокирует слишком долго:
q = Queue(maxsize=1)
# Producer
def my_callback(x):
q.put(x)
q.put("dummy",True,timeout) # Blocks until the first result is retrieved
q.join() # Blocks again until task_done is called
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
q.task_done() # (one "task_done" per "get")
if next_item is job_done:
break
yield next_item
q.get() # Retrieves the "dummy" object (must be after yield)
q.task_done() # Unblocks the producer, so a new iteration can start
Ответ 2
Генератор в качестве сопрограммы (без резьбы)
Пусть FakeFtp
с функцией retrbinary
использует callback, вызываемый при каждом успешном чтении фрагмента данных:
class FakeFtp(object):
def __init__(self):
self.data = iter(["aaa", "bbb", "ccc", "ddd"])
def login(self, user, password):
self.user = user
self.password = password
def retrbinary(self, cmd, cb):
for chunk in self.data:
cb(chunk)
Недостатком использования простой функции обратного вызова является то, что она вызывается повторно, и функция обратного вызова не может легко сохранять контекст между вызовами.
Следующий код определяет генератор process_chunks
, который сможет получать порции данных один за другим и обрабатывать их. В отличие от простого обратного вызова, здесь мы можем сохранить всю обработку внутри одной функции без потери контекста.
from contextlib import closing
from itertools import count
def main():
processed = []
def process_chunks():
for i in count():
try:
# (repeatedly) get the chunk to process
chunk = yield
except GeneratorExit:
# finish_up
print("Finishing up.")
return
else:
# Here process the chunk as you like
print("inside coroutine, processing chunk:", i, chunk)
product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
processed.append(product)
with closing(process_chunks()) as coroutine:
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls 'coroutine.send(data)'
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to 'yield' line in 'process_chunks'
print("processed result", processed)
print("DONE")
Чтобы увидеть код в действии, поместите класс FakeFtp
, код, показанный выше, и следующую строку:
main()
в один файл и назовите его:
$ python headsandtails.py
('inside coroutine, processing chunk:', 0, 'aaa')
('inside coroutine, processing chunk:', 1, 'bbb')
('inside coroutine, processing chunk:', 2, 'ccc')
('inside coroutine, processing chunk:', 3, 'ddd')
Finishing up.
('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd'])
DONE
Как это устроено
processed = []
, чтобы показать, что у генератора process_chunks
не должно быть проблем с его внешним контекстом. Все обернуто в def main():
чтобы доказать, нет необходимости использовать глобальные переменные.
def process_chunks()
- это ядро решения. Он может иметь входные параметры одним выстрелом (здесь не используются), но основной точкой, где он получает входные данные, является каждая строка yield
возвращающая то, что кто-то отправляет через .send(data)
в экземпляр этого генератора. Можно coroutine.send(chunk)
но в этом примере это делается с помощью обратного вызова, ссылающегося на эту функцию callback.send
.
Обратите внимание, что в реальном решении нет проблем с множественным yield
в коде, они обрабатываются один за другим. Это может быть использовано, например, для чтения (и игнорирования) заголовка файла CSV, а затем продолжить обработку записей с данными.
Мы могли бы создать экземпляр и использовать генератор следующим образом:
coroutine = process_chunks()
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls 'coroutine.send(data)'
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to 'yield' line in 'process_chunks'
# close the coroutine (will throw the 'GeneratorExit' exception into the
# 'process_chunks' coroutine).
coroutine.close()
Реальный код использует contextlib
контекста closing
contextlib
чтобы всегда вызывать coroutine.close()
.
Выводы
Это решение не предоставляет своего рода итератор для потребления данных в традиционном стиле "извне". С другой стороны, мы можем:
- использовать генератор "изнутри"
- сохранить всю итеративную обработку в одной функции без прерывания между обратными вызовами
- при желании использовать внешний контекст
- обеспечить полезные результаты снаружи
- все это можно сделать без использования потоков
Кредиты: Решение в значительной степени вдохновлено SO ответом. Python FTP "чанк" итератор (без загрузки всего файла в память), написанный user2357112
Ответ 3
Концепция Используйте блокирующую очередь с maxsize=1
и моделью производителя/потребителя.
Выполняется обратный вызов, затем следующий вызов обратного вызова будет блокироваться в полной очереди.
Затем потребитель выдает значение из очереди, пытается получить другое значение и блокирует чтение.
Изготовителю разрешено нажать в очередь, промыть и повторить.
Использование:
def dummy(func, arg, callback=None):
for i in range(100):
callback(func(arg+i))
# Dummy example:
for i in Iteratorize(dummy, lambda x: x+1, 0):
print(i)
# example with scipy:
for i in Iteratorize(scipy.optimize.fmin, func, x0):
print(i)
Может использоваться как ожидаемый для итератора:
for i in take(5, Iteratorize(dummy, lambda x: x+1, 0)):
print(i)
Класс итерации:
from thread import start_new_thread
from Queue import Queue
class Iteratorize:
"""
Transforms a function that takes a callback
into a lazy iterator (generator).
"""
def __init__(self, func, ifunc, arg, callback=None):
self.mfunc=func
self.ifunc=ifunc
self.c_callback=callback
self.q = Queue(maxsize=1)
self.stored_arg=arg
self.sentinel = object()
def _callback(val):
self.q.put(val)
def gentask():
ret = self.mfunc(self.ifunc, self.stored_arg, callback=_callback)
self.q.put(self.sentinel)
if self.c_callback:
self.c_callback(ret)
start_new_thread(gentask, ())
def __iter__(self):
return self
def next(self):
obj = self.q.get(True,None)
if obj is self.sentinel:
raise StopIteration
else:
return obj
Возможно, с некоторой очисткой можно принять *args
и **kwargs
для обертываемой функции и/или обратного вызова окончательного результата.
Ответ 4
Решение для обработки неблокирующих обратных вызовов
Решение с использованием threading
и queue
довольно хорошее, высокопроизводительное и кроссплатформенное, возможно, лучшее.
Здесь я приведу это не слишком плохое решение, которое в основном предназначено для обработки неблокирующих обратных вызовов, например вызывается из родительской функции через threading.Thread(target=callback).start()
или другими неблокирующими способами.
import pickle
import select
import subprocess
def my_fmin(func, x0):
# open a process to use as a pipeline
proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
def my_callback(x):
# x might be any object, not only str, so we use pickle to dump it
proc.stdin.write(pickle.dumps(x).replace(b'\n', b'\\n') + b'\n')
proc.stdin.flush()
from scipy import optimize
optimize.fmin(func, x0, callback=my_callback)
# this is meant to handle non-blocking callbacks, e.g. called somewhere
# through 'threading.Thread(target=callback).start()'
while select.select([proc.stdout], [], [], 0)[0]:
yield pickle.loads(proc.stdout.readline()[:-1].replace(b'\\n', b'\n'))
# close the process
proc.communicate()
Затем вы можете использовать функцию следующим образом:
# unfortunately, 'scipy.optimize.fmin' callback is blocking.
# so this example is just for showing how-to.
for x in my_fmin(lambda x: x**2, 3):
print(x)
Хотя это решение кажется довольно простым и читабельным, оно не столь высокоэффективно, как решения threading
и queue
, поскольку:
- Процессы намного тяжелее, чем потоки.
- Передача данных через канал вместо памяти выполняется намного медленнее.
Кроме того, он не работает в Windows, поскольку модуль select
в Windows может обрабатывать только сокеты, а не каналы и другие файловые дескрипторы.
Ответ 5
Как насчет
data = []
scipy.optimize.fmin(func,x0,callback=data.append)
for line in data:
print line
Если нет, что именно вы хотите делать с данными генератора?
Ответ 6
Вариант ответа Фрица:
- Поддерживает
send
, чтобы выбрать возвращаемое значение для обратного вызова
- Поддерживает
throw
, чтобы выбрать исключение для обратного вызова
- Поддерживает
close
, чтобы изящно завершить работу
- Не вычисляет элемент очереди, пока он не будет запрошен
Полный код с тестами можно найти на github
import queue
import threading
import collections.abc
class generator_from_callback(collections.abc.Generator):
def __init__(self, expr):
"""
expr: a function that takes a callback
"""
self._expr = expr
self._done = False
self._ready_queue = queue.Queue(1)
self._done_queue = queue.Queue(1)
self._done_holder = [False]
# local to avoid reference cycles
ready_queue = self._ready_queue
done_queue = self._done_queue
done_holder = self._done_holder
def callback(value):
done_queue.put((False, value))
cmd, *args = ready_queue.get()
if cmd == 'close':
raise GeneratorExit
elif cmd == 'send':
return args[0]
elif cmd == 'throw':
raise args[0]
def thread_func():
try:
cmd, *args = ready_queue.get()
if cmd == 'close':
raise GeneratorExit
elif cmd == 'send':
if args[0] is not None:
raise TypeError("can't send non-None value to a just-started generator")
elif cmd == 'throw':
raise args[0]
ret = expr(callback)
raise StopIteration(ret)
except BaseException as e:
done_holder[0] = True
done_queue.put((True, e))
self._thread = threading.Thread(target=thread_func)
self._thread.start()
def __next__(self):
return self.send(None)
def send(self, value):
if self._done_holder[0]:
raise StopIteration
self._ready_queue.put(('send', value))
is_exception, val = self._done_queue.get()
if is_exception:
raise val
else:
return val
def throw(self, exc):
if self._done_holder[0]:
raise StopIteration
self._ready_queue.put(('throw', exc))
is_exception, val = self._done_queue.get()
if is_exception:
raise val
else:
return val
def close(self):
if not self._done_holder[0]:
self._ready_queue.put(('close',))
self._thread.join()
def __del__(self):
self.close()
Который работает как:
In [3]: def callback(f):
...: ret = f(1)
...: print("gave 1, got {}".format(ret))
...: f(2)
...: print("gave 2")
...: f(3)
...:
In [4]: i = generator_from_callback(callback)
In [5]: next(i)
Out[5]: 1
In [6]: i.send(4)
gave 1, got 4
Out[6]: 2
In [7]: next(i)
gave 2, got None
Out[7]: 3
In [8]: next(i)
StopIteration
Для scipy.optimize.fmin
вы должны использовать generator_from_callback(lambda c: scipy.optimize.fmin(func, x0, callback=c))