Python: Подождите все фьючерсы `concurrent.futures.ThreadPoolExecutor`
Я дал concurrent.futures.ThreadPoolExecutor
набор задач, и я хочу подождать, пока они все не закончатся, прежде чем продолжить поток. Как я могу это сделать, не сохраняя всех фьючерсов и называя wait
на них? (Мне нужно действие для исполнителя.)
Ответы
Ответ 1
Просто позвоните Executor.shutdown
:
shutdown(wait=True)
Сигнал исполнителю о том, что он должен освобождать любые ресурсы, которые он используя, когда текущие ожидающие фьючерсы выполняются. Вызовы до Executor.submit()
и Executor.map()
, сделанных после выключения поднять RuntimeError
.
Если wait True
, то этот метод не вернется, пока все ожидающие фьючерсы не будут выполненное выполнение, и ресурсы, связанные с исполнителем, были освобождены.
Однако, если вы отслеживаете свои фьючерсы в списке, вы можете не закрывать исполнятеля для дальнейшего использования с помощью функции futures.wait()
:
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
Дождитесь экземпляров Future
(возможно, созданных разными Executor
экземпляры), заданные fs
для завершения. Возвращает названный 2-кортеж множеств. Первый набор, названный завершенным, содержит фьючерсы, которые завершено (закончено или отменено) до завершения ожидания. второй набор, названный not_done, содержит незавершенные фьючерсы.
обратите внимание, что если вы не предоставляете timeout
, он ждет завершения всех фьючерсов.
Вы также можете использовать futures.as_completed()
, однако вам придется перебирать его.
Ответ 2
Ответ Bakuriu правильный. Просто немного растянуть. Как мы все знаем, менеджер контекста имеет методы __enter__
и __exit__
. Вот как class Executor
(базовый класс ThreadPoolExecutor) определен
class Executor(object):
# other methods
def shutdown(self, wait=True):
"""Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other
methods can be called after this one.
Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
executor have been reclaimed.
"""
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
И это ThreadPoolExecutor
, который фактически определяет метод shutdown
class ThreadPoolExecutor(_base.Executor):
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()