Прерывания клавиатуры с пулом многопроцессорности python
Как я могу обрабатывать события KeyboardInterrupt с пулами многопроцессорности python? Вот простой пример:
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
sleep(1)
return i*i
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
# **** THIS PART NEVER EXECUTES. ****
pool.terminate()
print "You cancelled the program!"
sys.exit(1)
print "\nFinally, here are the results: ", results
if __name__ == "__main__":
go()
При запуске кода выше, KeyboardInterrupt
возникает, когда я нажимаю ^C
, но процесс просто зависает в этой точке, и я должен убить его извне.
Я хочу, чтобы в любой момент можно нажать ^C
и заставить все процессы выйти изящно.
Ответы
Ответ 1
Это ошибка Python. При ожидании условия в threading.Condition.wait() KeyboardInterrupt никогда не отправляется. Репро:
import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"
Исключение KeyboardInterrupt не будет доставлено до тех пор, пока wait() не вернется, и он никогда не вернется, поэтому прерывание никогда не произойдет. KeyboardInterrupt должен почти наверняка прервать условие ожидания.
Обратите внимание, что это не происходит, если указан таймаут; cond.wait(1) немедленно получит прерывание. Итак, обходным путем является указание тайм-аута. Для этого замените
results = pool.map(slowly_square, range(40))
с
results = pool.map_async(slowly_square, range(40)).get(9999999)
или аналогичный.
Ответ 2
Из того, что я недавно нашел, лучшим решением является создание рабочих процессов для полного игнорирования SIGINT и ограничение всего кода очистки родительским процессом. Это устраняет проблему как для рабочих процессов бездействия, так и для занятости, и в ваших дочерних процессах не требуется код обработки ошибок.
import signal
...
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
...
def main()
pool = multiprocessing.Pool(size, init_worker)
...
except KeyboardInterrupt:
pool.terminate()
pool.join()
Объяснение и полный пример кода можно найти на http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ и http://github.com/jreese/multiprocessing-keyboardinterrupt соответственно.
Ответ 3
По некоторым причинам обрабатываются только исключения, унаследованные от базового класса Exception
. В качестве обходного пути вы можете повторно поднять свой KeyboardInterrupt
как экземпляр Exception
:
from multiprocessing import Pool
import time
class KeyboardInterruptError(Exception): pass
def f(x):
try:
time.sleep(x)
return x
except KeyboardInterrupt:
raise KeyboardInterruptError()
def main():
p = Pool(processes=4)
try:
print 'starting the pool map'
print p.map(f, range(10))
p.close()
print 'pool map complete'
except KeyboardInterrupt:
print 'got ^C while pool mapping, terminating the pool'
p.terminate()
print 'pool is terminated'
except Exception, e:
print 'got exception: %r, terminating the pool' % (e,)
p.terminate()
print 'pool is terminated'
finally:
print 'joining pool processes'
p.join()
print 'join complete'
print 'the end'
if __name__ == '__main__':
main()
Обычно вы получаете следующий результат:
staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end
Итак, если вы нажмете ^C
, вы получите:
staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
Ответ 4
Обычно эта простая структура работает для Ctrl - C в пуле:
def signal_handle(_signal, frame):
print "Stopping the Jobs."
signal.signal(signal.SIGINT, signal_handle)
Как было сказано в нескольких похожих сообщениях:
Захват клавиатурного прерывания в Python без возможности прогона
Ответ 5
Кажется, есть две проблемы, которые делают исключения, в то время как многопроцессорность раздражает. Первый (отметил Гленн), что вам нужно использовать map_async
с таймаутом вместо map
, чтобы получить немедленный ответ (т.е. Не завершить обработку всего списка). Второй (отмеченный Андреем) заключается в том, что многопроцессорность не захватывает исключения, которые не наследуются от Exception
(например, SystemExit
). Итак, вот мое решение, которое касается обоих из них:
import sys
import functools
import traceback
import multiprocessing
def _poolFunctionWrapper(function, arg):
"""Run function under the pool
Wrapper around function to catch exceptions that don't inherit from
Exception (which aren't caught by multiprocessing, so that you end
up hitting the timeout).
"""
try:
return function(arg)
except:
cls, exc, tb = sys.exc_info()
if issubclass(cls, Exception):
raise # No worries
# Need to wrap the exception with something multiprocessing will recognise
import traceback
print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))
def _runPool(pool, timeout, function, iterable):
"""Run the pool
Wrapper around pool.map_async, to handle timeout. This is required so as to
trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
Further wraps the function in _poolFunctionWrapper to catch exceptions
that don't inherit from Exception.
"""
return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
def myMap(function, iterable, numProcesses=1, timeout=9999):
"""Run the function on the iterable, optionally with multiprocessing"""
if numProcesses > 1:
pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
mapFunc = functools.partial(_runPool, pool, timeout)
else:
pool = None
mapFunc = map
results = mapFunc(function, iterable)
if pool is not None:
pool.close()
pool.join()
return results
Ответ 6
Я нашел, что на данный момент лучшим решением является не использование функции multiprocessing.pool, а скорее сворачивание ваших собственных возможностей пула. Я привел пример, демонстрирующий ошибку с применением apply_async, а также пример, показывающий, как вообще не использовать возможности пула.
http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/
Ответ 7
Проголосовавший ответ не решает основной проблемы, но похожего побочного эффекта.
Джесси Ноллер, автор многопроцессорной библиотеки, объясняет, как правильно обращаться с CTRL + C при использовании multiprocessing.Pool
в старом сообщении в блоге.
import signal
from multiprocessing import Pool
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=initializer)
try:
pool.map(perform_download, dowloads)
except KeyboardInterrupt:
pool.terminate()
pool.join()
Ответ 8
Я новичок в Python. Я искал повсюду ответ и наткнулся на это, а также на другие блоги и видео с YouTube. Я попытался скопировать код автора выше и воспроизвести его на моем python 2.7.13 в бите Windows 7 64-. Это близко к тому, чего я хочу достичь.
Я сделал мои дочерние процессы, чтобы игнорировать ControlC и сделать родительский процесс завершенным. Похоже, что в обход дочернего процесса я избегаю этой проблемы.
#!/usr/bin/python
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
try:
print "<slowly_square> Sleeping and later running a square calculation..."
sleep(1)
return i * i
except KeyboardInterrupt:
print "<child processor> Don't care if you say CtrlC"
pass
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
pool.terminate()
pool.close()
print "You cancelled the program!"
exit(1)
print "Finally, here are the results", results
if __name__ == '__main__':
go()
Часть, начинающаяся с pool.terminate()
никогда не выполняется.
Ответ 9
Вы можете попробовать использовать метод apply_async для объекта Pool, например:
import multiprocessing
import time
from datetime import datetime
def test_func(x):
time.sleep(2)
return x**2
def apply_multiprocessing(input_list, input_function):
pool_size = 5
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)
try:
jobs = {}
for value in input_list:
jobs[value] = pool.apply_async(input_function, [value])
results = {}
for value, result in jobs.items():
try:
results[value] = result.get()
except KeyboardInterrupt:
print "Interrupted by user"
pool.terminate()
break
except Exception as e:
results[value] = e
return results
except Exception:
raise
finally:
pool.close()
pool.join()
if __name__ == "__main__":
iterations = range(100)
t0 = datetime.now()
results1 = apply_multiprocessing(iterations, test_func)
t1 = datetime.now()
print results1
print "Multi: {}".format(t1 - t0)
t2 = datetime.now()
results2 = {i: test_func(i) for i in iterations}
t3 = datetime.now()
print results2
print "Non-multi: {}".format(t3 - t2)
Выход:
100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000
Преимущество этого метода заключается в том, что результаты, обработанные до прерывания, будут возвращены в словаре результатов:
>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
Ответ 10
Как ни странно, похоже, что вы должны обрабатывать KeyboardInterrupt
у детей. Я ожидал, что это сработает как написано... попробуйте изменить slowly_square
на:
def slowly_square(i):
try:
sleep(1)
return i * i
except KeyboardInterrupt:
print 'You EVIL bastard!'
return 0
Это должно работать так, как вы ожидали.