Python: вывести команду Ctrl-C. Подсказка "действительно хочу выйти (y/n)", возобновить выполнение, если нет
У меня есть программа, которая может иметь длительное выполнение. В основном модуле у меня есть следующее:
import signal
def run_program()
...time consuming execution...
def Exit_gracefully(signal, frame):
... log exiting information ...
... close any open files ...
sys.exit(0)
if __name__ == '__main__':
signal.signal(signal.SIGINT, Exit_gracefully)
run_program()
Это отлично работает, но мне хотелось бы приостановить выполнение при захвате SIGINT, вызвав пользователя, если они действительно захотят выйти, и возобновить, когда я остановился в run_program(), если они решат, что они не хотят выйти.
Единственный способ, которым я могу это сделать, - это запустить программу в отдельном потоке, удерживая основной поток, ожидающий его, и готов поймать SIGINT. Если пользователь хочет выйти из основного потока, он может выполнить очистку и убить дочерний поток.
Есть ли более простой способ?
Ответы
Ответ 1
Обработчики сигналов питона не кажутся реальными обработчиками сигналов; то есть они происходят после факта, в нормальном потоке и после того, как обработчик C уже вернулся. Таким образом, вы попытаетесь поместить свою логику вывода в обработчик сигнала. Поскольку обработчик сигнала работает в основном потоке, он также блокирует выполнение.
Что-то вроде этого кажется хорошо работающим.
import signal
import time
import sys
def run_program():
while True:
time.sleep(1)
print("a")
def exit_gracefully(signum, frame):
# restore the original signal handler as otherwise evil things will happen
# in raw_input when CTRL+C is pressed, and our signal handler is not re-entrant
signal.signal(signal.SIGINT, original_sigint)
try:
if raw_input("\nReally quit? (y/n)> ").lower().startswith('y'):
sys.exit(1)
except KeyboardInterrupt:
print("Ok ok, quitting")
sys.exit(1)
# restore the exit gracefully handler here
signal.signal(signal.SIGINT, exit_gracefully)
if __name__ == '__main__':
# store the original SIGINT handler
original_sigint = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, exit_gracefully)
run_program()
Код восстанавливает исходный обработчик сигнала в течение raw_input
; raw_input
сам не воссоединяется и снова вводит его
приведет к тому, что RuntimeError: can't re-enter readline
будет поднят из time.sleep
, чего мы не хотим, поскольку это сложнее поймать, чем KeyboardInterrupt
. Скорее, мы даем 2 последовательных Ctrl-C, чтобы поднять KeyboardInterrupt
.
Ответ 2
от https://gist.github.com/rtfpessoa/e3b1fe0bbfcd8ac853bf
#!/usr/bin/env python
import signal
import sys
def signal_handler(signal, frame):
# your code here
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
Bye!
Ответ 3
когда процедура завершится, сделайте что-нибудь
предположим, что вы просто хотите, чтобы процедура что-то сделала после завершения задачи
import time
class TestTask:
def __init__(self, msg: str):
self.msg = msg
def __enter__(self):
print(f'Task Start!:{self.msg}')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print('Task End!')
@staticmethod
def do_something():
try:
time.sleep(5)
except:
pass
with TestTask('Hello World') as task:
task.do_something()
когда процесс покидает with
, который будет запускаться __exit__
, даже если произойдет KeyboardInterrupt, которые совпадают.
если вам не нравится видеть ошибку, добавьте try ... except ...
@staticmethod
def do_something():
try:
time.sleep(5)
except:
pass
пауза, продолжение, сброс и т.д.
У меня нет идеального решения, но оно может быть полезно для вас.
Это означает разделить ваш процесс на множество подпроцессов и сохранить его, так что Finish.it больше не будет выполняться, так как вы обнаружите, что он уже выполнен.
import time
from enum import Enum
class Action(Enum):
EXIT = 0
CONTINUE = 1
RESET = 2
class TestTask:
def __init__(self, msg: str):
self.msg = msg
def __enter__(self):
print(f'Task Start!:{self.msg}')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print('Task End!')
def do_something(self):
tuple_job = (self._foo, self._bar) # implement by yourself
list_job_state = [0] * len(tuple_job)
dict_keep = {} # If there is a need to communicate between jobs, and you dont want to use class members, you can use this method.
while 1:
try:
for idx, cur_process in enumerate(tuple_job):
if not list_job_state[idx]:
cur_process(dict_keep)
list_job_state[idx] = True
if all(list_job_state):
print('100%')
break
except KeyboardInterrupt:
print('KeyboardInterrupt. input action:')
msg = '\n\t'.join([f"{action + ':':<10}{str(act_number)}" for act_number, action in
enumerate([name for name in vars(Action) if not name.startswith('_')])
])
case = Action(int(input(f'\t{msg}\n:')))
if case == Action.EXIT:
break
if case == Action.RESET:
list_job_state = [0] * len(tuple_job)
@staticmethod
def _foo(keep_dict: dict) -> bool: # implement by yourself
time.sleep(2)
print('1%')
print('2%')
print('...')
print('60%')
keep_dict['status_1'] = 'status_1'
return True
@staticmethod
def _bar(keep_dict: dict) -> bool: # implement by yourself
time.sleep(2)
print('61%')
print(keep_dict.get('status_1'))
print('...')
print('99%')
return True
with TestTask('Hello World') as task:
task.do_something()
консоль
input action number:2
Task Start!:Hello World
1%
2%
...
60%
KeyboardInterrupt. input action:
EXIT: 0
CONTINUE: 1
RESET: 2
:1
61%
status_1
...
99%
100%
Task End!