Ответ 1
Один готовый вариант, который может помочь с этим, - twisted.internet.defer.DeferredSemaphore
. Это асинхронная версия обычного (счетного) семафора, который вы, возможно, уже знаете, если вы сделали многопрограммированное программирование.
Семафор A (счет) очень похож на мьютекс (блокировка). Но если мьютекс можно получить только один раз до соответствующего выпуска, семафор (подсчет) можно настроить таким образом, чтобы произвольное (но определенное) количество приобретений было успешным до того, как требуются соответствующие выпуски.
Вот пример использования DeferredSemaphore
для запуска десяти асинхронных операций, но для запуска не более трех из них сразу:
from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def main():
sem = DeferredSemaphore(3)
jobs = []
for i in range(10):
jobs.append(sem.run(async, i))
d = gatherResults(jobs)
d.addCallback(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
DeferredSemaphore
также имеет явные методы acquire
и release
, но метод run
настолько удобен, что почти всегда вы хотите. Он вызывает метод acquire
, который возвращает Deferred
. К этому первому Deferred
он добавляет обратный вызов, который вызывает функцию, которую вы передали (вместе с любыми позиционными или ключевыми аргументами). Если эта функция возвращает a Deferred
, то к этой секунде Deferred
добавляется обратный вызов, который вызывает метод release
.
Также обрабатывается синхронный случай, немедленно позвонив release
. Ошибки также обрабатываются, позволяя им распространяться, но делая необходимым release
, чтобы оставить DeferredSemaphore
в согласованном состоянии. Результат функции, переданной в run
(или результат возврата Deferred
), становится результатом Deferred
, возвращаемого run
.
Другой возможный подход может основываться на DeferredQueue
и cooperate
. DeferredQueue
в основном похож на обычную очередь, но его метод get
возвращает Deferred
. Если в момент вызова не было элементов в очереди, Deferred
не будет срабатывать до тех пор, пока элемент не будет добавлен.
Вот пример:
from random import randrange
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def assign(jobs):
# Create new jobs to be processed
jobs.put(randrange(10))
reactor.callLater(randrange(10), assign, jobs)
def worker(jobs):
while True:
yield jobs.get().addCallback(async)
def main():
jobs = DeferredQueue()
for i in range(10):
jobs.put(i)
assign(jobs)
for i in range(3):
cooperate(worker(jobs))
reactor.run()
if __name__ == '__main__':
main()
Обратите внимание, что рабочая функция async
такая же, как и в первом примере. Однако на этот раз также существует функция worker
, которая явно вытягивает задания из DeferredQueue
и обрабатывает их с помощью async
(добавив async
в качестве обратного вызова Deferred
, возвращаемого get
)), Генератор worker
управляется cooperate
, который повторяет его один раз после каждого Deferred
, который дает огни. Затем основной цикл запускает три из этих рабочих генераторов, так что в любой момент времени будут выполняться три задания.
Этот подход включает в себя немного больше кода, чем подход DeferredSemaphore
, но имеет некоторые преимущества, которые могут быть интересными. Во-первых, cooperate
возвращает экземпляр CooperativeTask
, который имеет полезные методы, такие как pause
, resume
и еще пару других. Кроме того, все задания, назначенные одному и тому же сотруднику, будут сотрудничать друг с другом в планировании, чтобы не перегружать цикл событий (и это то, что дает API его имя). На стороне DeferredQueue
также можно установить ограничения на количество элементов, ожидающих обработки, поэтому вы можете избежать полной перегрузки сервера (например, если ваши графические процессоры застревают и перестают выполнять задачи). Если код, вызывающий put
, обрабатывает исключение переполнения очереди, вы можете использовать это как давление, чтобы попытаться прекратить прием новых заданий (возможно, шунтирование их на другой сервер или предупреждение администратора). Выполнение подобных действий с помощью DeferredSemaphore
немного сложнее, поскольку нет возможности ограничить количество заданий, ожидающих получения семафора.