Обработка сигналов в многопоточном Python
Это должно быть очень просто, и я очень удивлен, что мне не удалось найти ответы на эти вопросы уже в stackoverflow.
У меня есть программа вроде демона, которая должна отвечать на сигналы SIGTERM и SIGINT, чтобы хорошо работать с выскочкой. Я читал, что лучший способ сделать это - запустить основной цикл программы в отдельном потоке из основного потока и позволить основному потоку обрабатывать сигналы. Затем, когда принимается сигнал, обработчик сигнала должен сообщать основному циклу о выходе, установив флаг часового, который обычно проверяется в основном цикле.
Я пытался это сделать, но он не работает так, как я ожидал. См. Код ниже:
from threading import Thread
import signal
import time
import sys
stop_requested = False
def sig_handler(signum, frame):
sys.stdout.write("handling signal: %s\n" % signum)
sys.stdout.flush()
global stop_requested
stop_requested = True
def run():
sys.stdout.write("run started\n")
sys.stdout.flush()
while not stop_requested:
time.sleep(2)
sys.stdout.write("run exited\n")
sys.stdout.flush()
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGINT, sig_handler)
t = Thread(target=run)
t.start()
t.join()
sys.stdout.write("join completed\n")
sys.stdout.flush()
Я проверил это двумя способами:
1)
$ python main.py > output.txt&
[2] 3204
$ kill -15 3204
2)
$ python main.py
ctrl+c
В обоих случаях я ожидаю, что это будет записано в вывод:
run started
handling signal: 15
run exited
join completed
В первом случае программа выходит, но все, что я вижу, это:
run started
Во втором случае сигнал SIGTERM, по-видимому, игнорируется при нажатии ctrl + c, и программа не выходит.
Что мне здесь не хватает?
Ответы
Ответ 1
Проблема заключается в том, что, как описано в Выполнение обработчиков сигналов Python:
Обработчик сигнала Python не выполняется в низкоуровневом (C) сигнальном обработчике. Вместо этого низкоуровневый обработчик сигналов устанавливает флаг, который сообщает виртуальной машине выполнить соответствующий обработчик сигнала Python в более поздней точке (например, в следующей инструкции байт-кода)
...
Долгосрочный расчет, реализованный исключительно в C (например, согласование регулярных выражений на большом тексте), может работать без прерывания в течение произвольного промежутка времени независимо от полученных сигналов. Обработчики сигналов Python будут вызываться, когда вычисление завершается.
Ваш основной поток заблокирован на threading.Thread.join
, что в конечном итоге означает, что он заблокирован на C при вызове pthread_join
. Конечно, это не "длительный расчет", это блок в syscall... но тем не менее, пока этот вызов не завершится, ваш обработчик сигнала не сможет работать.
И, хотя на некоторых платформах pthread_join
произойдет сбой EINTR
по сигналу, а на других он не будет. В linux я полагаю, что это зависит от того, выбираете ли вы стиль BSD или поведение по умолчанию siginterrupt
, но по умолчанию нет.
Итак, что вы можете с этим сделать?
Хорошо, я уверен, что изменения в обработке сигналов в Python 3.3 фактически изменили поведение по умолчанию в Linux, поэтому вам не нужно ничего делать, если вы Обновить; просто запустите под 3.3+, и ваш код будет работать так, как вы ожидаете. По крайней мере, это для меня с CPython 3.4 на OS X и 3.3 в Linux. (Если я ошибаюсь в этом, я не уверен, является ли это ошибкой в CPython или нет, поэтому вы можете поднять его на python-list, а не открывать проблему...)
С другой стороны, pre-3.3, модуль signal
определенно не раскрывает инструменты, необходимые для исправления этой проблемы самостоятельно. Итак, если вы не можете обновить до 3.3, решение должно ждать чего-то прерывистого, например, Condition
или Event
. Дочерний поток уведомляет событие непосредственно перед его завершением, и основной поток ожидает события, прежде чем он присоединяется к дочернему потоку. Это определенно взломанно. И я не могу найти ничего, что гарантирует, что это изменит ситуацию; это просто работает для меня в различных сборках CPython 2.7 и 3.2 на OS X и 2.6 и 2.7 на Linux...
Ответ 2
Ответ abarnert был на месте. Я все еще использую Python 2.7. Чтобы решить эту проблему для себя, я написал класс InterruptableThread.
В настоящий момент он не позволяет передавать дополнительные аргументы в цель потока. Join также не принимает параметр тайм-аута. Это потому, что мне не нужно это делать. Вы можете добавить его, если хотите. Вы, вероятно, захотите удалить выходные операторы, если используете это самостоятельно. Они просто существуют как способ комментирования и тестирования.
import threading
import signal
import sys
class InvalidOperationException(Exception):
pass
# noinspection PyClassHasNoInit
class GlobalInterruptableThreadHandler:
threads = []
initialized = False
@staticmethod
def initialize():
signal.signal(signal.SIGTERM, GlobalInterruptableThreadHandler.sig_handler)
signal.signal(signal.SIGINT, GlobalInterruptableThreadHandler.sig_handler)
GlobalInterruptableThreadHandler.initialized = True
@staticmethod
def add_thread(thread):
if threading.current_thread().name != 'MainThread':
raise InvalidOperationException("InterruptableThread objects may only be started from the Main thread.")
if not GlobalInterruptableThreadHandler.initialized:
GlobalInterruptableThreadHandler.initialize()
GlobalInterruptableThreadHandler.threads.append(thread)
@staticmethod
def sig_handler(signum, frame):
sys.stdout.write("handling signal: %s\n" % signum)
sys.stdout.flush()
for thread in GlobalInterruptableThreadHandler.threads:
thread.stop()
GlobalInterruptableThreadHandler.threads = []
class InterruptableThread:
def __init__(self, target=None):
self.stop_requested = threading.Event()
self.t = threading.Thread(target=target, args=[self]) if target else threading.Thread(target=self.run)
def run(self):
pass
def start(self):
GlobalInterruptableThreadHandler.add_thread(self)
self.t.start()
def stop(self):
self.stop_requested.set()
def is_stop_requested(self):
return self.stop_requested.is_set()
def join(self):
try:
while self.t.is_alive():
self.t.join(timeout=1)
except (KeyboardInterrupt, SystemExit):
self.stop_requested.set()
self.t.join()
sys.stdout.write("join completed\n")
sys.stdout.flush()
Класс может использоваться двумя разными способами. Вы можете подклассом InterruptableThread:
import time
import sys
from interruptable_thread import InterruptableThread
class Foo(InterruptableThread):
def __init__(self):
InterruptableThread.__init__(self)
def run(self):
sys.stdout.write("run started\n")
sys.stdout.flush()
while not self.is_stop_requested():
time.sleep(2)
sys.stdout.write("run exited\n")
sys.stdout.flush()
sys.stdout.write("all exited\n")
sys.stdout.flush()
foo = Foo()
foo2 = Foo()
foo.start()
foo2.start()
foo.join()
foo2.join()
Или вы можете использовать его больше, как работает threading.thread. Однако метод run должен принимать объект InterruptableThread как параметр.
import time
import sys
from interruptable_thread import InterruptableThread
def run(t):
sys.stdout.write("run started\n")
sys.stdout.flush()
while not t.is_stop_requested():
time.sleep(2)
sys.stdout.write("run exited\n")
sys.stdout.flush()
t1 = InterruptableThread(run)
t2 = InterruptableThread(run)
t1.start()
t2.start()
t1.join()
t2.join()
sys.stdout.write("all exited\n")
sys.stdout.flush()
Сделай с этим то, что будешь.
Ответ 3
Я столкнулся с той же проблемой, что и сигнал не обрабатывается при присоединении нескольких потоков. После чтения abarnert я перешел на Python 3 и решил проблему. Но я действительно хочу изменить всю свою программу на python 3. Итак, я решил свою программу, избегая вызова потока join() до отправки сигнала. Ниже мой код.
Это не очень хорошо, но я решил свою программу в python 2.7. Мой вопрос был отмечен как дублированный, поэтому я поставил свое решение здесь.
import threading, signal, time, os
RUNNING = True
threads = []
def monitoring(tid, itemId=None, threshold=None):
global RUNNING
while(RUNNING):
print "PID=", os.getpid(), ";id=", tid
time.sleep(2)
print "Thread stopped:", tid
def handler(signum, frame):
print "Signal is received:" + str(signum)
global RUNNING
RUNNING=False
#global threads
if __name__ == '__main__':
signal.signal(signal.SIGUSR1, handler)
signal.signal(signal.SIGUSR2, handler)
signal.signal(signal.SIGALRM, handler)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGQUIT, handler)
print "Starting all threads..."
thread1 = threading.Thread(target=monitoring, args=(1,), kwargs={'itemId':'1', 'threshold':60})
thread1.start()
threads.append(thread1)
thread2 = threading.Thread(target=monitoring, args=(2,), kwargs={'itemId':'2', 'threshold':60})
thread2.start()
threads.append(thread2)
while(RUNNING):
print "Main program is sleeping."
time.sleep(30)
for thread in threads:
thread.join()
print "All threads stopped."