Подпроцесс python с тайм-аутом и большим выходом (> 64K)
Я хочу выполнить процесс, ограничить время выполнения некоторым таймаутом в секундах и захватить вывод, полученный процессом. И я хочу сделать это на windows, linux и freebsd.
Я попытался реализовать это тремя способами:
-
cmd - без тайм-аута и subprocess.PIPE для вывода данных.
BEHAVIOR: работает как ожидалось, но не поддерживает таймаут, мне нужен тайм-аут...
-
cmd_to - с тайм-аутом и подпроцессом .PIPE для вывода данных.
ПОВЕДЕНИЕ: Блокирует выполнение подпроцесса при выходе >= 2 ^ 16 байт.
-
cmd_totf - с тайм-аутом и tempfile.NamedTemporaryfile для вывода данных.
BEHAVIOR: работает как ожидалось, но использует временные файлы на диске.
Они доступны ниже для более тщательного осмотра.
Как видно из приведенного ниже результата, тогда таймаут-код блокирует выполнение подпроцесса при использовании подпроцесса. PIPE и вывод из подпроцессa >= 2 ^ 16 байт.
В документации к подпроцессу указано, что это ожидается при вызове process.wait() и использовании subprocessing.PIPE, однако при использовании process.poll() не отображаются предупреждения, поэтому что здесь происходит не так?
У меня есть решение в cmd_totf, которое использует модуль tempfile, но компромисс заключается в том, что он записывает вывод на диск, что я бы ДЕЙСТВИТЕЛЬНО хотел избежать.
Итак, мои вопросы:
- Что я делаю неправильно в cmd_to?
- Есть ли способ сделать то, что я хочу, и не использовать tempfiles/сохранить вывод в памяти.
Script, чтобы сгенерировать кучу вывода ('exp_gen.py'):
#!/usr/bin/env python
import sys
output = "b"*int(sys.argv[1])
print output
Три различных реализации (cmd, cmd_to, cmd_totf) оберток вокруг подпроцесса .Popen:
#!/usr/bin/env python
import subprocess, time, tempfile
bufsize = -1
def cmd(cmdline, timeout=60):
"""
Execute cmdline.
Uses subprocessing and subprocess.PIPE.
"""
p = subprocess.Popen(
cmdline,
bufsize = bufsize,
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE
)
out, err = p.communicate()
returncode = p.returncode
return (returncode, err, out)
def cmd_to(cmdline, timeout=60):
"""
Execute cmdline, limit execution time to 'timeout' seconds.
Uses subprocessing and subprocess.PIPE.
"""
p = subprocess.Popen(
cmdline,
bufsize = bufsize,
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE
)
t_begin = time.time() # Monitor execution time
seconds_passed = 0
while p.poll() is None and seconds_passed < timeout:
seconds_passed = time.time() - t_begin
time.sleep(0.1)
#if seconds_passed > timeout:
#
# try:
# p.stdout.close() # If they are not closed the fds will hang around until
# p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception
# p.terminate() # Important to close the fds prior to terminating the process!
# # NOTE: Are there any other "non-freed" resources?
# except:
# pass
#
# raise TimeoutInterrupt
out, err = p.communicate()
returncode = p.returncode
return (returncode, err, out)
def cmd_totf(cmdline, timeout=60):
"""
Execute cmdline, limit execution time to 'timeout' seconds.
Uses subprocessing and tempfile instead of subprocessing.PIPE.
"""
output = tempfile.NamedTemporaryFile(delete=False)
error = tempfile.NamedTemporaryFile(delete=False)
p = subprocess.Popen(
cmdline,
bufsize = 0,
shell = False,
stdin = None,
stdout = output,
stderr = error
)
t_begin = time.time() # Monitor execution time
seconds_passed = 0
while p.poll() is None and seconds_passed < timeout:
seconds_passed = time.time() - t_begin
time.sleep(0.1)
#if seconds_passed > timeout:
#
# try:
# p.stdout.close() # If they are not closed the fds will hang around until
# p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception
# p.terminate() # Important to close the fds prior to terminating the process!
# # NOTE: Are there any other "non-freed" resources?
# except:
# pass
#
# raise TimeoutInterrupt
p.wait()
returncode = p.returncode
fd = open(output.name)
out = fd.read()
fd.close()
fd = open(error.name)
err = fd.read()
fd.close()
error.close()
output.close()
return (returncode, err, out)
if __name__ == "__main__":
implementations = [cmd, cmd_to, cmd_totf]
bytes = ['65535', '65536', str(1024*1024)]
timeouts = [5]
for timeout in timeouts:
for size in bytes:
for i in implementations:
t_begin = time.time()
seconds_passed = 0
rc, err, output = i(['exp_gen.py', size], timeout)
seconds_passed = time.time() - t_begin
filler = ' '*(8-len(i.func_name))
print "[%s%s: timeout=%d, iosize=%s, seconds=%f]" % (repr(i.func_name), filler, timeout, size, seconds_passed)
Результат выполнения:
['cmd' : timeout=5, iosize=65535, seconds=0.016447]
['cmd_to' : timeout=5, iosize=65535, seconds=0.103022]
['cmd_totf': timeout=5, iosize=65535, seconds=0.107176]
['cmd' : timeout=5, iosize=65536, seconds=0.028105]
['cmd_to' : timeout=5, iosize=65536, seconds=5.116658]
['cmd_totf': timeout=5, iosize=65536, seconds=0.104905]
['cmd' : timeout=5, iosize=1048576, seconds=0.025964]
['cmd_to' : timeout=5, iosize=1048576, seconds=5.128062]
['cmd_totf': timeout=5, iosize=1048576, seconds=0.103183]
Ответы
Ответ 1
В отличие от всех предупреждений в документации по подпрограмме, тогда прямое чтение из process.stdout и process.stderr обеспечило лучшее решение.
К лучшему я имею в виду, что я могу читать результат процесса, который превышает 2 ^ 16 байтов, без необходимости временного хранения вывода на диске.
Следующий код:
import fcntl
import os
import subprocess
import time
def nonBlockRead(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ''
def cmd(cmdline, timeout=60):
"""
Execute cmdline, limit execution time to 'timeout' seconds.
Uses the subprocess module and subprocess.PIPE.
Raises TimeoutInterrupt
"""
p = subprocess.Popen(
cmdline,
bufsize = bufsize, # default value of 0 (unbuffered) is best
shell = False, # not really needed; it disabled by default
stdout = subprocess.PIPE,
stderr = subprocess.PIPE
)
t_begin = time.time() # Monitor execution time
seconds_passed = 0
stdout = ''
stderr = ''
while p.poll() is None and seconds_passed < timeout: # Monitor process
time.sleep(0.1) # Wait a little
seconds_passed = time.time() - t_begin
# p.std* blocks on read(), which messes up the timeout timer.
# To fix this, we use a nonblocking read()
# Note: Not sure if this is Windows compatible
stdout += nonBlockRead(p.stdout)
stderr += nonBlockRead(p.stderr)
if seconds_passed >= timeout:
try:
p.stdout.close() # If they are not closed the fds will hang around until
p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception
p.terminate() # Important to close the fds prior to terminating the process!
# NOTE: Are there any other "non-freed" resources?
except:
pass
raise TimeoutInterrupt
returncode = p.returncode
return (returncode, stdout, stderr)
Ответ 2
Отказ от ответственности: этот ответ не протестирован в Windows и FreeBSD. Но используемые модули должны работать над этими системами. Я считаю, что это должен быть рабочий ответ на ваш вопрос - он работает для меня.
Здесь код, который я только что взломал, чтобы решить проблему на linux. Это комбинация нескольких потоков Stackoverflow и моих собственных исследований в документах Python 3.
Основные характеристики этого кода:
- Использует процессы не потоки для блокировки ввода-вывода, поскольку они могут более надежно быть p.terminated()
- Реализует перезагружаемый контрольный тайм-аут, который перезапускает подсчет, когда происходит какой-то выход
- Реализует долгосрочный сторожевой таймер для ограничения общей продолжительности выполнения
- Может работать в stdin (хотя мне нужно только кормить в однократных коротких строках)
- Может захватывать stdout/stderr обычным способом Popen (только stdout кодируется и stderr перенаправляется на stdout, но может быть легко разделен)
- Это почти в реальном времени, потому что он проверяет только каждые 0,2 секунды для вывода. Но вы можете уменьшить это или легко удалить интервал ожидания.
- Многие отладочные распечатки по-прежнему позволяют видеть, что происходит, когда.
Единственная зависимость кода - это перечисление, реализованное здесь, но код можно легко изменить без работы. Он используется только для выделения двух тайм-аутов - используйте отдельные исключения, если хотите.
Здесь код - как обычно - обратная связь очень ценится:
(Edit 29-Jun-2012 - теперь код действительно работает)
# Python module runcmd
# Implements a class to launch shell commands which
# are killed after a timeout. Timeouts can be reset
# after each line of output
#
# Use inside other script with:
#
# import runcmd
# (return_code, out) = runcmd.RunCmd(['ls', '-l', '/etc'],
# timeout_runtime,
# timeout_no_output,
# stdin_string).go()
#
import multiprocessing
import queue
import subprocess
import time
import enum
def timestamp():
return time.strftime('%Y%m%d-%H%M%S')
class ErrorRunCmd(Exception): pass
class ErrorRunCmdTimeOut(ErrorRunCmd): pass
class Enqueue_output(multiprocessing.Process):
def __init__(self, out, queue):
multiprocessing.Process.__init__(self)
self.out = out
self.queue = queue
self.daemon = True
def run(self):
try:
for line in iter(self.out.readline, b''):
#print('worker read:', line)
self.queue.put(line)
except ValueError: pass # Readline of closed file
self.out.close()
class Enqueue_input(multiprocessing.Process):
def __init__(self, inp, iterable):
multiprocessing.Process.__init__(self)
self.inp = inp
self.iterable = iterable
self.daemon = True
def run(self):
#print("writing stdin")
for line in self.iterable:
self.inp.write(bytes(line,'utf-8'))
self.inp.close()
#print("writing stdin DONE")
class RunCmd():
"""RunCmd - class to launch shell commands
Captures and returns stdout. Kills child after a given
amount (timeout_runtime) wallclock seconds. Can also
kill after timeout_retriggerable wallclock seconds.
This second timer is reset whenever the child does some
output
(return_code, out) = RunCmd(['ls', '-l', '/etc'],
timeout_runtime,
timeout_no_output,
stdin_string).go()
"""
Timeout = enum.Enum('No','Retriggerable','Runtime')
def __init__(self, cmd, timeout_runtime, timeout_retriggerable, stdin=None):
self.dbg = False
self.cmd = cmd
self.timeout_retriggerable = timeout_retriggerable
self.timeout_runtime = timeout_runtime
self.timeout_hit = self.Timeout.No
self.stdout = '--Cmd did not yield any output--'
self.stdin = stdin
def read_queue(self, q):
time_last_output = None
try:
bstr = q.get(False) # non-blocking
if self.dbg: print('{} chars read'.format(len(bstr)))
time_last_output = time.time()
self.stdout += bstr
except queue.Empty:
#print('queue empty')
pass
return time_last_output
def go(self):
if self.stdin:
pstdin = subprocess.PIPE
else:
pstdin = None
p = subprocess.Popen(self.cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=pstdin)
pin = None
if (pstdin):
pin = Enqueue_input(p.stdin, [self.stdin + '\n'])
pin.start()
q = multiprocessing.Queue()
pout = Enqueue_output(p.stdout, q)
pout.start()
try:
if self.dbg: print('Beginning subprocess with timeout {}/{} s on {}'.format(self.timeout_retriggerable, self.timeout_runtime, time.asctime()))
time_begin = time.time()
time_last_output = time_begin
seconds_passed = 0
self.stdout = b''
once = True # ensure loop executed at least once
# some child cmds may exit very fast, but still produce output
while once or p.poll() is None or not q.empty():
once = False
if self.dbg: print('a) {} of {}/{} secs passed and overall {} chars read'.format(seconds_passed, self.timeout_retriggerable, self.timeout_runtime, len(self.stdout)))
tlo = self.read_queue(q)
if tlo:
time_last_output = tlo
now = time.time()
if now - time_last_output >= self.timeout_retriggerable:
self.timeout_hit = self.Timeout.Retriggerable
raise ErrorRunCmdTimeOut(self)
if now - time_begin >= self.timeout_runtime:
self.timeout_hit = self.Timeout.Runtime
raise ErrorRunCmdTimeOut(self)
if q.empty():
time.sleep(0.1)
# Final try to get "last-millisecond" output
self.read_queue(q)
finally:
self._close(p, [pout, pin])
return (self.returncode, self.stdout)
def _close(self, p, procs):
if self.dbg:
if self.timeout_hit != self.Timeout.No:
print('{} A TIMEOUT occured: {}'.format(timestamp(), self.timeout_hit))
else:
print('{} No timeout occured'.format(timestamp()))
for process in [proc for proc in procs if proc]:
try:
process.terminate()
except:
print('{} Process termination raised trouble'.format(timestamp()))
raise
try:
p.stdin.close()
except: pass
if self.dbg: print('{} _closed stdin'.format(timestamp()))
try:
p.stdout.close() # If they are not closed the fds will hang around until
except: pass
if self.dbg: print('{} _closed stdout'.format(timestamp()))
#p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception
try:
p.terminate() # Important to close the fds prior to terminating the process!
# NOTE: Are there any other "non-freed" resources?
except: pass
if self.dbg: print('{} _closed Popen'.format(timestamp()))
try:
self.stdout = self.stdout.decode('utf-8')
except: pass
self.returncode = p.returncode
if self.dbg: print('{} _closed all'.format(timestamp()))
Использовать с:
import runcmd
cmd = ['ls', '-l', '/etc']
worker = runcmd.RunCmd(cmd,
40, # limit runtime [wallclock seconds]
2, # limit runtime after last output [wallclk secs]
'' # stdin input string
)
(return_code, out) = worker.go()
if worker.timeout_hit != worker.Timeout.No:
print('A TIMEOUT occured: {}'.format(worker.timeout_hit))
else:
print('No timeout occured')
print("Running '{:s}' returned {:d} and {:d} chars of output".format(cmd, return_code, len(out)))
print('Output:')
print(out)
command
- первый аргумент - должен быть списком команды и ее аргументов. Он используется для вызова Popen(shell=False)
, а его таймауты - в секундах. В настоящее время нет кода для отключения тайм-аутов. Установите timeout_no_output
на time_runtime
, чтобы эффективно отключить перезагружаемый timeout_no_output
.
stdin_string
может быть любой строкой, которая должна быть отправлена на стандартный ввод команды. Установите None
, если ваша команда не нуждается в каком-либо вводе. Если строка указана, добавляется окончательный "\n".