Как я могу прочитать все доступные данные из подпроцесса .Popen.stdout(без блокировки)?

Мне нужен способ либо прочитать все доступные в настоящий момент символы в потоке, создаваемом Popen, либо узнать, сколько символов осталось в буфере.

Backround: Я хочу удаленно управлять интерактивным приложением в Python. До сих пор я использовал Popen для создания нового подпроцесса:

process=subprocess.Popen(["python"],shell=True,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE, cwd=workingDir)

(Я действительно не запускаю python, но реальный интерактивный интерфейс похож.) В настоящий момент я читаю 1 байт, пока не обнаруживаю, что процесс достиг команды командной строки:

output = ""
while output[-6:]!="SCIP> ":
    output += process.stdout.read(1)
    sys.stdout.write(output[-1])
return output

Затем я начинаю длительное вычисление через process.stdin.write("command\n"). Моя проблема в том, что я не могу проверить, закончилось ли вычисление или нет, потому что я не могу проверить, являются ли последние символы в потоке приглашением или нет. read() или read(n) блокирует мой поток до тех пор, пока он не достигнет EOF, чего никогда не будет, потому что интерактивная программа не закончится до тех пор, пока не будет сказано. Поиск запроса в том, как этот цикл не работает, также не будет работать, потому что подсказка появится только после вычисления.

Идеальное решение позволило бы мне прочитать весь доступный символ из потока и сразу же вернуть пустую строку, если читать нечего.

Ответы

Ответ 1

Сотрясаясь, я нашел это действительно приятное решение

Постоянный подпроцесс python

что позволяет избежать проблемы блокировки, используя fcntl, чтобы установить атрибуты файлов в подпроцессных каналах в неблокирующий режим, не требуется никаких дополнительных потоков или опроса. Я мог что-то упустить, но он решил мою проблему с управлением интерактивным процессом.

Ответ 2

Инкрементный синтаксический анализ Stenout по-разному не является проблемой. Просто вставьте трубку в поток и попробуйте очистить ее, выбирая разделители. В зависимости от ваших предпочтений, он может передать его в другой канал/файл или поместить разобранные "куски" в "стек" в асинхронном режиме. Вот пример асинхронного "chunking" stdout на основе пользовательского разделителя:

import cStringIO
import uuid
import threading
import os

class InputStreamChunker(threading.Thread):
    '''
    Threaded object / code that mediates reading output from a stream,
    detects "separation markers" in the stream and spits out chunks
    of original stream, split when ends of chunk are encountered.

    Results are made available as a list of filled file-like objects
    (your choice). Results are accessible either "asynchronously"
    (you can poll at will for results in a non-blocking way) or
    "synchronously" by exposing a "subscribe and wait" system based
    on threading.Event flags.

    Usage:
    - instantiate this object
    - give our input pipe as "stdout" to other subprocess and start it:
        Popen(..., stdout = th.input, ...)
    - (optional) subscribe to data_available event
    - pull resulting file-like objects off .data
      (if you are "messing" with .data from outside of the thread,
       be curteous and wrap the thread-unsafe manipulations between:
       obj.data_unoccupied.clear()
       ... mess with .data
       obj.data_unoccupied.set()
       The thread will not touch obj.data for the duration and will
       block reading.)

    License: Public domain
    Absolutely no warranty provided
    '''
    def __init__(self, delimiter = None, outputObjConstructor = None):
        '''
        delimiter - the string that will be considered a delimiter for the stream
        outputObjConstructor - instanses of these will be attached to self.data array
         (intantiator_pointer, args, kw)
        '''
        super(InputStreamChunker,self).__init__()

        self._data_available = threading.Event()
        self._data_available.clear() # parent will .wait() on this for results.
        self._data = []
        self._data_unoccupied = threading.Event()
        self._data_unoccupied.set() # parent will set this to true when self.results is being changed from outside
        self._r, self._w = os.pipe() # takes all inputs. self.input = public pipe in.
        self._stop = False
        if not delimiter: delimiter = str(uuid.uuid1())
        self._stream_delimiter = [l for l in delimiter]
        self._stream_roll_back_len = ( len(delimiter)-1 ) * -1
        if not outputObjConstructor:
            self._obj = (cStringIO.StringIO, (), {})
        else:
            self._obj = outputObjConstructor
    @property
    def data_available(self):
        '''returns a threading.Event instance pointer that is
        True (and non-blocking to .wait() ) when we attached a
        new IO obj to the .data array.
        Code consuming the array may decide to set it back to False
        if it done with all chunks and wants to be blocked on .wait()'''
        return self._data_available
    @property
    def data_unoccupied(self):
        '''returns a threading.Event instance pointer that is normally
        True (and non-blocking to .wait() ) Set it to False with .clear()
        before you start non-thread-safe manipulations (changing) .data
        array. Set it back to True with .set() when you are done'''
        return self._data_unoccupied
    @property
    def data(self):
        '''returns a list of input chunkes (file-like objects) captured
        so far. This is a "stack" of sorts. Code consuming the chunks
        would be responsible for disposing of the file-like objects.
        By default, the file-like objects are instances of cStringIO'''
        return self._data
    @property
    def input(self):
        '''This is a file descriptor (not a file-like).
        It the input end of our pipe which you give to other process
        to be used as stdout pipe for that process'''
        return self._w
    def flush(self):
        '''Normally a read on a pipe is blocking.
        To get things moving (make the subprocess yield the buffer,
        we inject our chunk delimiter into self.input

        This is useful when primary subprocess does not write anything
        to our in pipe, but we need to make internal pipe reader let go
        of the pipe and move on with things.
        '''
        os.write(self._w, ''.join(self._stream_delimiter))
    def stop(self):
        self._stop = True
        self.flush() # reader has its teeth on the pipe. This makes it let go for for a sec.
        os.close(self._w)
        self._data_available.set()
    def __del__(self):
        try:
            self.stop()
        except:
            pass
        try:
            del self._w
            del self._r
            del self._data
        except:
            pass
    def run(self):
        ''' Plan:
        - We read into a fresh instance of IO obj until marker encountered.
        - When marker is detected, we attach that IO obj to "results" array
          and signal the calling code (through threading.Event flag) that
          results are available
        - repeat until .stop() was called on the thread.
        '''
        marker = ['' for l in self._stream_delimiter] # '' is there on purpose
        tf = self._obj[0](*self._obj[1], **self._obj[2])
        while not self._stop:
            l = os.read(self._r, 1)
            print('Thread talking: Ordinal of char is:%s' %ord(l))
            trash_str = marker.pop(0)
            marker.append(l)
            if marker != self._stream_delimiter:
                tf.write(l)
            else:
                # chopping off the marker first
                tf.seek(self._stream_roll_back_len, 2)
                tf.truncate()
                tf.seek(0)
                self._data_unoccupied.wait(5) # seriously, how much time is needed to get your items off the stack?
                self._data.append(tf)
                self._data_available.set()
                tf = self._obj[0](*self._obj[1], **self._obj[2])
        os.close(self._r)
        tf.close()
        del tf

def waitforresults(ch, answers, expect):
    while len(answers) < expect:
        ch.data_available.wait(0.5); ch.data_unoccupied.clear()
        while ch.data:
            answers.append(ch.data.pop(0))
        ch.data_available.clear(); ch.data_unoccupied.set()
        print('Main talking: %s answers received, expecting %s\n' % ( len(answers), expect) )

def test():
    '''
    - set up chunker
    - set up Popen with chunker output stream
    - push some data into proc.stdin
    - get results
    - cleanup
    '''

    import subprocess

    ch = InputStreamChunker('\n')
    ch.daemon = True
    ch.start()

    print('starting the subprocess\n')
    p = subprocess.Popen(
        ['cat'],
        stdin = subprocess.PIPE,
        stdout = ch.input,
        stderr = subprocess.PIPE)

    answers = []

    i = p.stdin
    i.write('line1 qwer\n') # will be in results
    i.write('line2 qwer\n') # will be in results
    i.write('line3 zxcv asdf') # will be in results only after a ch.flush(),
                                # prepended to other line or when the pipe is closed
    waitforresults(ch, answers, expect = 2)

    i.write('line4 tyui\n') # will be in results
    i.write('line5 hjkl\n') # will be in results
    i.write('line6 mnbv') # will be in results only after a ch.flush(),
                                # prepended to other line or when the pipe is closed
    waitforresults(ch, answers, expect = 4)

    ## now we will flush the rest of input (that last line did not have a delimiter)
    i.close()
    ch.flush()
    waitforresults(ch, answers, expect = 5)

    should_be = ['line1 qwer', 'line2 qwer',
        'line3 zxcv asdfline4 tyui', 'line5 hjkl', 'line6 mnbv']
    assert should_be == [i.read() for i in answers]

    # don't forget to stop the chunker. It it closes the pipes
    p.terminate()
    ch.stop()
    del p, ch

if __name__ == '__main__':
    test()

Изменить: удалить ошибочное словосочетание о том, что "запись в proc stdin - это одноразовая вещь"

Ответ 3

Есть и другое возможное решение, но может потребовать, чтобы вы немного изменили свою программу.

Если у вас есть несколько источников ввода-вывода (файловые дескрипторы, сокеты и т.д.), и вы хотите ждать их всех сразу, используйте Python select. Вы могли (например) поставить стандартный ввод (для чтения из терминала) и канал (из подпроцесса) в список и дождаться, когда вход будет готов на одном из них. select блокирует, пока I/O не будет доступен ни в одном из дескрипторов в списке. Затем вы просматриваете список, ищите те, у которых есть доступные данные.

Этот подход оказывается весьма эффективным - гораздо больше, чем опрос файлового дескриптора, чтобы увидеть, есть ли какие-либо данные. Он также имеет силу простоты; то есть вы можете выполнить то, что хотите, с минимальным кодом. Упрощенный код означает меньше возможностей для ошибок.

Ответ 4

Неправильно, что read() блокирует до EOF - он блокируется до тех пор, пока не получит достаточное количество данных, которое ему нужно - и с другой стороны возможно, что некоторые данные хранятся в буферах (он не сбрасывается только потому, что вы закончили печать с новой строкой).

Почему бы не попробовать в типе печати что-то вроде "### OVER ###\n", а затем stdout.flush(), а затем на родительской стороне собрать, пока вы не увидите токен OVER, скажем, с помощью ''.join(i for i in iter(process.stdout.readline, '### OVER ###\n'))

Ответ 5

Я пробовал много подходов, например, сделать неблокирующее stdout следующим:

fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

Но единственное рабочее решение описано здесь:

master, slave = pty.openpty()

proc = subprocess.Popen(
    shlex.split(command), 
    stdout=slave, 
    stderr=slave, 
    close_fds=True, 
    bufsize=0
)

stdout = os.fdopen(master)

И затем:

while True:
    out = stdout.readline()
    output_result = proc.poll()
    if out == '' and output_result is not None:
        break
    if out != '':
        print(out)

Ответ 6

Я не думаю, что readline() заблокирует ваш процесс.

line = process.stdout.readline()

Раньше я пытался использовать

for line in process.stdout:
    print(line)

но это похоже на зависание, пока процесс не завершится.