Ответ 1
Вы правы, что не можете выразить то, что хотите выразить в cacheiter
. Декоратор inlineCallbacks
не позволит вам иметь функцию, которая возвращает итератор. Если вы украшаете функцию с ней, то результатом будет функция, которая всегда возвращает Deferred
. Это для чего.
Отчасти это затрудняет то, что итераторы не работают с асинхронным кодом. Если есть Отсрочка, участвующая в создании элементов вашего итератора, тогда элементы, которые выходят из вашего итератора, сначала будут отложены.
Вы можете сделать что-то подобное, чтобы учесть это:
@inlineCallbacks
def process_work():
for element_deferred in some_jobs:
element = yield element_deferred
work_on(element)
Это может работать, но выглядит особенно странно. Поскольку генераторы могут уступать только своему вызывающему абоненту (не, например, вызывающему абоненту), тотератор some_jobs
ничего не может поделать; только код, лексически расположенный в пределах process_work
, может привести к отложенному трамплину с трафиком inlineCallbacks
для ожидания.
Если вы не возражаете против этого шаблона, тогда мы могли бы визуализировать ваш код, написанный примерно так:
from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor
class cacheiter(object):
def __init__(self, cached):
self._cached = iter(cached.items())
self._remaining = []
def __iter__(self):
return self
@inlineCallbacks
def next(self):
# First re-fill the list of synchronously-producable values if it is empty
if not self._remaining:
for name, value in self._cached:
# Wait on this Deferred to determine if this cache item should be included
if (yield check_condition(name, value)):
# If so, put all of its values into the value cache so the next one
# can be returned immediately next time this method is called.
self._remaining.extend([(name, k, v) for (k, v) in value.items()])
# Now actually give out a value, if there is one.
if self._remaining:
returnValue(self._remaining.pop())
# Otherwise the entire cache has been visited and the iterator is complete.
# Sadly we cannot signal completion with StopIteration, because the iterator
# protocol isn't going to add an errback to this Deferred and check for
# StopIteration. So signal completion with a simple None value.
returnValue(None)
@inlineCallbacks
def process_chunk(myiter, num):
for i in xrange(num):
nextval = yield myiter.next()
if nextval is None:
# The iterator signaled completion via the special None value.
# Processing is complete.
returnValue(True)
# Otherwise process the value.
yield some_processing(nextval)
# Indicate there is more processing to be done.
returnValue(False)
def sleep(sec):
# Simple helper to delay asynchronously for some number of seconds.
return deferLater(reactor, sec, lambda: None)
@inlineCallbacks
def process_loop(cached):
myiter = cacheiter(cached)
while True:
# Loop processing 10 items from myiter at a time, until process_chunk signals
# there are no values left.
result = yield process_chunk(myiter, 10)
if result:
print 'All done'
break
print 'More left'
# Insert the 5 second delay before starting on the next chunk.
yield sleep(5)
d = process_loop(cached)
Другой подход, который вы могли бы использовать, заключается в использовании twisted.internet.task.cooperate
. cooperate
принимает итератор и потребляет его, полагая, что его потребление потенциально дорогостоящее, и расщепление работы на нескольких итерациях реактора. Принимая определение cacheiter
сверху:
from twisted.internet.task import cooperate
def process_loop(cached):
finished = []
def process_one(value):
if value is None:
finished.append(True)
else:
return some_processing(value)
myiter = cacheiter(cached)
while not finished:
value_deferred = myiter.next()
value_deferred.addCallback(process_one)
yield value_deferred
task = cooperate(process_loop(cached))
d = task.whenDone()