Как получить результат из программы, которая использует перерисовку экрана для использования в скребке экрана терминала?

Я пытаюсь получить вывод полноэкранной программы терминалов, которая использует перерисовки эвакуационных кодов для представления данных и для чего требуется tty (или pty).

Основная процедура, которой должен следовать человек:

  • Запустите программу в терминале.
  • Программа использует перерисовку для отображения и обновления различных полей данных.
  • Человек ждет, пока дисплей не будет согласован (возможно, используя такие сигналы, как "он не мерцает" или "он равен 0,5 с момента последнего обновления" ).
  • Человек смотрит на поля в определенных положениях и запоминает или записывает данные.
  • Человек выходит из программы.
  • Затем человек выполняет действия вне программы на основе этих данных.

Я хотел бы автоматизировать этот процесс. Шаги 4 и 5 могут быть выполнены в любом порядке. Хотя перфекционист во мне беспокоится о самосогласованности состояния экрана, я признаю, что я не уверен, как правильно его определить (кроме, быть может, использовать "это был больше, чем определенный период времени с момента последнего обновления" ).

Похоже, что использование pty и subprocess, за которым следует какой-то экранный скребок, является одним из возможных способов сделать это, но я не совсем понимаю, как использовать их все вместе, и какие опасности существуют с некоторыми из объекты нижнего уровня, которые я использую.

Рассмотрим эту программу:

#!/usr/bin/env python2
import os
import pty
import subprocess
import time

import pexpect.ANSI

# Psuedo-terminal FDs
fd_master, fd_slave = pty.openpty()

# Start 'the_program'
the_proc = subprocess.Popen(['the_program'], stdin=fd_master, stdout=fd_slave, stderr=fd_slave)

# Just kill it after a couple of seconds
time.sleep(2)
the_proc.terminate()

# Read output into a buffer
output_buffer = b''
read_size = None

while (read_size is None) or (read_size > 0):
    chunk = os.read(fd_master, 1024)
    output_buffer += chunk
    read_size = len(chunk)

print("output buffer size: {:d}".format(len(output_buffer)))

# Feed output to screen scraper
ansi_term = pexpect.ANSI.ANSI(24, 80)
ansi_term.write(output_buffer)

# Parse presented data... 

Одна проблема заключается в том, что блоки вызовов os.read() всегда. Мне также интересно, есть ли лучший способ получить выход pty для дальнейшего использования. В частности:

  • Есть ли способ сделать это (или его части) с помощью кода более высокого уровня? Я не могу просто использовать subprocess.PIPE для моего вызова Popen, потому что тогда целевая программа не будет работать. Но могу ли я обернуть эти дескрипторы файлов чем-то более удобными способами для ввода/вывода?
  • Если нет, как мне избежать блокировки во время вызова os.read? Я больше привык к файлоподобным объектам, где read() всегда возвращает, и просто возвращает пустую строку, если конец потока достигнут. Здесь os.read в конечном итоге блокирует независимо от того, что.
  • Я опасаюсь заставить этот script "просто работать", не осознавая потенциальных опасностей (например, условия гонки, которые появляются один раз в тысячу). Что еще мне нужно знать?

Я также открываю мысль, что использование pty и subprocess в первую очередь не лучший способ сделать это.

Ответы

Ответ 1

Если программа не генерирует много выходных данных; самый простой способ - использовать pexpect.run(), чтобы получить свой результат через pty:

import pexpect # $ pip install pexpect

output, status = pexpect.run('top', timeout=2, withexitstatus=1)

Вы можете определить, будет ли выходной сигнал "оседать", сравнив его с предыдущим выходом:

import pexpect # $ pip install pexpect

def every_second(d, last=[None]):
    current = d['child'].before
    if last[0] == current: # "settled down"
        raise pexpect.TIMEOUT(None) # exit run
    last[0] = current

output, status =  pexpect.run('top', timeout=1, withexitstatus=1,
                              events={pexpect.TIMEOUT: every_second})

Вы можете использовать регулярное выражение, которое соответствует рекуррентному шаблону на выходе вместо таймаута. Цель состоит в том, чтобы определить, когда выход "улажен".

Здесь для сравнения используется код, который напрямую использует модули subprocess и pty:

#!/usr/bin/env python
"""Start process; wait 2 seconds; kill the process; print all process output."""
import errno
import os
import pty
import select
from subprocess import Popen, STDOUT
try:
    from time import monotonic as timer
except ImportError:
    from time import time as timer

output = []
master_fd, slave_fd = pty.openpty() #XXX add cleanup on exception
p = Popen(["top"], stdin=slave_fd, stdout=slave_fd, stderr=STDOUT,
          close_fds=True)
os.close(slave_fd)
endtime = timer() + 2 # stop in 2 seconds
while True:
    delay = endtime - timer()
    if delay <= 0: # timeout
        break
    if select.select([master_fd], [], [], delay)[0]:
        try:
            data = os.read(master_fd, 1024)
        except OSError as e: #NOTE: no need for IOError here
            if e.errno != errno.EIO:
                raise
            break # EIO means EOF on some systems
        else:
            if not data: # EOF
                break
            output.append(data)
os.close(master_fd)
p.terminate()
returncode = p.wait()
print([returncode, b''.join(output)])

Примечание:

  • все три стандартных потока в дочернем процессе используют slave_fd в отличие от кода в вашем ответе, который использует master_fd для stdin
  • код считывает вывод, пока процесс все еще запущен. Он позволяет принимать большой вывод (больше, чем размер одного буфера в ядре).
  • код не потеряет данные об ошибке EIO (здесь означает EOF)

На основе Подпроцессы подпроцесса Python() зависает.

Ответ 2

Вы можете использовать pexpect для этого. Используйте функцию run() для получения данных и см. Включенный эмулятор VT100 (или pyte) для его рендеринга.

Используя утилиту top в качестве примера:

import time
import pexpect
import pexpect.ANSI

# Start 'top' and quit after a couple of seconds
output_buffer = pexpect.run('top', timeout=2)

# For continuous reading/interaction, you would need to use the "events"
# arg, threading, or a framework for asynchronous communication.

ansi_term = pexpect.ANSI.ANSI(24, 80)
ansi_term.write(output_buffer)
print(str(ansi_term))

(Обратите внимание, что есть ошибка, которая иногда приводит к дополнительным расстояниям строк.)