Python: передача и анализ потока данных в и из внешней программы с дополнительными входными и выходными файлами

Проблема: У меня плохо разработанная программа Fortran (я не могу ее изменить, я застрял с ней), которая берет текстовый ввод из stdin и других входных файлов и записывает результаты вывода текста в stdout и другие выходные файлы. Размер ввода и вывода довольно большой, и я хотел бы избежать записи на жесткий диск (медленная работа). Я написал функцию, которая выполняет итерацию по строкам нескольких входных файлов, и у меня также есть парсеры для множественного вывода. Я действительно не знаю, прочитала ли программа сначала все входные данные, а затем начинает выводить или запускает вывод при чтении ввода.

Цель: Чтобы иметь функцию, которая передает внешнюю программу с тем, что она хочет, и анализирует вывод, как он поступает из программы, без записи данных в текстовые файлы на жестком диске.

Исследование: Наивный способ использования файлов:

from subprocess import PIPE, Popen

def execute_simple(cmd, stdin_iter, stdout_parser, input_files, output_files):

    for filename, file_iter in input_files.iteritems():
        with open(filename ,'w') as f:
            for line in file_iter:
                f.write(line + '\n')


    p_sub = Popen(
        shlex.split(cmd),
        stdin = PIPE,
        stdout = open('stdout.txt', 'w'),
        stderr = open('stderr.txt', 'w'),
        bufsize=1
    )
    for line in stdin_iter:
        p_sub.stdin.write(line + '\n')

    p_sub.stdin.close()
    p_sub.wait()

    data = {}
    for filename, parse_func in output_files.iteritems():
        # The stdout.txt and stderr.txt is included here
        with open(filename,'r') as f:
            data[filename] = parse_func(
                    iter(f.readline, b'')
            )
    return data

Я попытался и subprocess модуль для совместной работы внешней программы. Дополнительные файлы ввода/вывода обрабатываются с именованными каналами и multiprocessing. Я хочу передать stdin с помощью итератора (который возвращает строки для ввода), сохраните stderr в списке и проанализируйте stdout, как он поступает из внешней программы. Вход и выход могут быть довольно большими, поэтому использование communicate невозможно.

У меня есть синтаксический анализатор в формате:

def parser(iterator):
    for line in iterator:
        # Do something
        if condition:
            break
    some_other_function(iterator)
    return data

Я посмотрел на это решение, используя select, чтобы выбрать соответствующий поток, однако я не знаю, как заставить его работать с моим парсером stdout и как кормить stdin.

Я также смотрю asyncio модуль, но, как я вижу, у меня будет такая же проблема с синтаксическим анализом толстого.

Ответы

Ответ 1

Вы должны использовать именованные каналы для всех входных и выходных данных в программу Fortran, чтобы избежать записи на диск. Затем у вашего потребителя вы можете использовать потоки для чтения из каждого из выходных источников программы и добавлять информацию в очередь для обработки в порядке.

Чтобы смоделировать это, я создал приложение python daemon.py, которое читает со стандартного ввода и возвращает квадратный корень до EOF. Он записывает все входные данные в файл журнала, указанный в качестве аргумента командной строки, и печатает квадратный корень в stdout и все ошибки в stderr. Я думаю, что он имитирует вашу программу (конечно, количество выходных файлов - только одно, но его можно масштабировать). Вы можете просмотреть исходный код этого тестового приложения здесь. Обратите внимание на явный вызов stdout.flush(). По умолчанию стандартный вывод буферизируется, что означает, что он выводится в конце и сообщения не поступают по порядку. Надеюсь, ваше приложение Fortran не будет буферировать его вывод. Я считаю, что мое примерное приложение, вероятно, не будет работать в Windows из-за использования Unix только select, что не имеет значения в вашем случае.

У меня есть мое потребительское приложение, которое запускает приложение-демон в качестве подпроцесса: stdin, stdout и stderr перенаправляются на subprocess.PIPE s. каждая из этих труб передается в другой поток, один для ввода и три для обработки файла журнала, ошибок и стандартного вывода соответственно. Все они добавляют свои сообщения в общий Queue, который ваш основной поток читает и отправляет вашему парсеру.

Это мой потребительский код:

import os, random, time
import subprocess
import threading
import Queue
import atexit

def setup():
    # make a named pipe for every file the program should write
    logfilepipe='logpipe'
    os.mkfifo(logfilepipe)

def cleanup():
    # put your named pipes here to get cleaned up
    logfilepipe='logpipe'
    os.remove(logfilepipe)

# run our cleanup code no matter what - avoid leaving pipes laying around
# even if we terminate early with Ctrl-C
atexit.register(cleanup)

# My example iterator that supplies input for the program. You already have an iterator 
# so don't worry about this. It just returns a random input from the sample_data list
# until the maximum number of iterations is reached.
class MyIter():
    sample_data=[0,1,2,4,9,-100,16,25,100,-8,'seven',10000,144,8,47,91,2.4,'^',56,18,77,94]
    def __init__(self, numiterations=1000):
        self.numiterations=numiterations
        self.current = 0

    def __iter__(self):
        return self

    def next(self):
        self.current += 1
        if self.current > self.numiterations:
            raise StopIteration
        else:
            return random.choice(self.__class__.sample_data)

# Your parse_func function - I just print it out with a [tag] showing its source.
def parse_func(source,line):
    print "[%s] %s" % (source,line)

# Generic function for sending standard input to the problem.
# p - a process handle returned by subprocess
def input_func(p, queue):
    # run the command with output redirected
    for line in MyIter(30): # Limit for testing purposes
        time.sleep(0.1) # sleep a tiny bit
        p.stdin.write(str(line)+'\n')
        queue.put(('INPUT', line))
    p.stdin.close()
    p.wait()

    # Once our process has ended, tell the main thread to quit
    queue.put(('QUIT', True))

# Generic function for reading output from the program. source can either be a
# named pipe identified by a string, or subprocess.PIPE for stdout and stderr.
def read_output(source, queue, tag=None):
    print "Starting to read output for %r" % source
    if isinstance(source,str):
        # Is a file or named pipe, so open it
        source=open(source, 'r') # open file with string name
    line = source.readline()
    # enqueue and read lines until EOF
    while line != '':
        queue.put((tag, line.rstrip()))
        line = source.readline()

if __name__=='__main__':
    cmd='daemon.py'

    # set up our FIFOs instead of using files - put file names into setup() and cleanup()
    setup()

    logfilepipe='logpipe'

    # Message queue for handling all output, whether it stdout, stderr, or a file output by our command
    lq = Queue.Queue()

    # open the subprocess for command
    print "Running command."
    p = subprocess.Popen(['/path/to/'+cmd,logfilepipe],
                                    stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Start threads to handle the input and output
    threading.Thread(target=input_func, args=(p, lq)).start()
    threading.Thread(target=read_output, args=(p.stdout, lq, 'OUTPUT')).start()
    threading.Thread(target=read_output, args=(p.stderr, lq, 'ERRORS')).start()

    # open a thread to read any other output files (e.g. log file) as named pipes
    threading.Thread(target=read_output, args=(logfilepipe, lq, 'LOG')).start()

    # Now combine the results from our threads to do what you want
    run=True
    while(run):
        (tag, line) = lq.get()
        if tag == 'QUIT':
            run=False
        else:
            parse_func(tag, line)

Мой итератор возвращает случайное значение ввода (некоторые из которых являются нежелательными для возникновения ошибок). Ваша замена должна быть заменой. Программа будет работать до конца ее ввода, а затем дождаться завершения подпроцесса до того, как вы добавите сообщение QUIT в свой основной поток. Мой parse_func, очевидно, очень прост, просто распечатывает сообщение и его источник, но вы должны иметь возможность работать с чем-то. Функция чтения из источника вывода предназначена для работы как с PIPE, так и с цепочками - не открывайте трубы в основном потоке, потому что они будут блокироваться до тех пор, пока не будет доступен вход. Поэтому для чтения файлов (например, чтение файлов журналов) лучше иметь дочерний поток, открывающий файл и блокировку. Однако мы создаем подпроцесс в основном потоке, чтобы мы могли передавать дескрипторы для stdin, stdout и stderr в соответствующие дочерние потоки.

Основываясь на на этой версии Python многопользовательской версии.

Ответ 2

Очень важно, чтобы программа Fortran вызывала флеш в конце каждого задания (может также быть чашечкой), если вы ожидаете окончания результатов перед отправкой нового задания. < ш > Команда зависит от компилятора, например. GNU fortran CALL FLUSH(unitnumber), или его можно моделировать, закрывая outpud и открывая снова для добавления.

Вы также можете легко написать несколько пустых строк со многими пробельными символами в конце, чтобы заполнить размер буфера, и вы получите новый блок данных. 5000 символов пробелов, вероятно, достаточно хороши, но не слишком много, что он заблокирует сторону Fortran трубы. Если вы прочитаете эти пустые строки сразу после отправки нового задания, вам даже не потребуется чтение без блокировки. Последняя строка задания может быть легко распознана в числовых приложениях. Если вы напишете приложение "чат", вам нужно что-то, что писали другие люди.