Как использовать subprocess.Popen для подключения нескольких процессов по каналам?

Как выполнить следующую команду оболочки с помощью модуля Python subprocess?

echo "input data" | awk -f script.awk | sort > outfile.txt

Входные данные будут поступать из строки, поэтому мне действительно не нужно echo. У меня так далеко, может ли кто-нибудь объяснить, как я получаю это для прохождения через sort тоже?

p_awk = subprocess.Popen(["awk","-f","script.awk"],
                          stdin=subprocess.PIPE,
                          stdout=file("outfile.txt", "w"))
p_awk.communicate( "input data" )

ОБНОВЛЕНИЕ. Обратите внимание: хотя принятый ответ ниже не отвечает на вопрос, как было задано, я считаю, что С. Лотт прав, и лучше избегать необходимости решать эту проблему в первую очередь

Ответы

Ответ 1

Вы были бы немного счастливее со следующим.

import subprocess

awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
    stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )

Делегировать часть работы в оболочку. Пусть он соединяет два процесса с конвейером.

Вы бы очень счастливо переписывали "script.awk" в Python, устраняя awk и конвейер.

Edit. Некоторые из причин предполагать, что awk не помогает.

[Слишком много причин для ответа через комментарии.]

  • Awk добавляет шаг без значимого значения. Нет ничего уникального в обработке awk, которую Python не обрабатывает.

  • Конвейерная обработка из awk для сортировки для больших наборов данных может улучшить прошедшее время обработки. Для кратких наборов данных он не имеет существенной выгоды. Быстрое измерение awk >file ; sort file и awk | sort покажет помощь concurrency. С помощью сортировки он редко помогает, потому что сортировка не является сквозным фильтром.

  • Простота обработки "Python для сортировки" (вместо "Python to awk to sort" ) предотвращает точный вид вопросов, задаваемых здесь.

  • Python - в то время как wordier, чем awk, также явно, где awk имеет определенные неявные правила, непрозрачные для новичков, и запутывает неспециалистов.

  • Awk (как и сама оболочка script) добавляет еще один язык программирования. Если все это можно сделать на одном языке (Python), устранение оболочки и программирование awk устраняют два языка программирования, позволяя кому-то сосредоточиться на составляющих значение частях задачи.

Нижняя строка: awk не может добавить значимое значение. В этом случае awk - это чистая стоимость; он добавил достаточно сложности, что необходимо было задать этот вопрос. Удаление awk будет чистой прибылью.

Боковая панель Почему создание конвейера (a | b) настолько сложно.

Когда оболочка сталкивается с a | b, она должна выполнить следующее.

  • Выполните дочерний процесс исходной оболочки. Это в конечном итоге станет b.

  • Создайте трубку os. (а не подпроцесс Python.PIPE), но вызовите os.pipe(), который возвращает два новых дескриптора файла, которые связаны через общий буфер. На этом этапе процесс имеет stdin, stdout, stderr от родителя, а также файл, который будет "stdout" и "b stdin".

  • Приведи ребенка. Ребенок заменяет его stdout новым stdout. Exec процесс a.

  • Закрытие b файла заменяет его stdin новым b stdin. Exec процесс b.

  • b ребенок ждет завершения.

  • Родитель ждет завершения b.

Я думаю, что приведенное выше можно использовать рекурсивно для появления a | b | c, но вы должны неявно заключать в скобки длинные конвейеры, рассматривая их так, как будто они a | (b | c).

Так как Python имеет os.pipe(), os.exec() и os.fork(), и вы можете заменить sys.stdin и sys.stdout, то есть способ сделать это в чистом Python. В самом деле, вы можете выработать несколько ярлыков, используя os.pipe() и subprocess.Popen.

Однако проще делегировать эту операцию оболочке.

Ответ 2

import subprocess

some_string = b'input_data'

sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in, 
                 stdin=subprocess.PIPE).communicate(some_string)

Ответ 3

Чтобы эмулировать конвейер оболочки:

from subprocess import check_call

check_call('echo "input data" | a | b > outfile.txt', shell=True)

без вызова оболочки (см. 17.1.4.2. Замена оболочки оболочки):

#!/usr/bin/env python
from subprocess import Popen, PIPE

a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
    with a.stdout, open("outfile.txt", "wb") as outfile:
        b = Popen(["b"], stdin=a.stdout, stdout=outfile)
    a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already

plumbum содержит некоторый синтаксический сахар:

#!/usr/bin/env python
from plumbum.cmd import a, b # magic

(a << "input data" | b > "outfile.txt")()

Аналог:

#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt

является:

#!/usr/bin/env python
from plumbum.cmd import awk, sort

(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()

Ответ 4

http://www.python.org/doc/2.5.2/lib/node535.html обрисовал это довольно хорошо. Есть ли какая-то часть этого, которую вы не понимаете?

Ваша программа будет довольно похожей, но вторая Popen будет иметь stdout = в файл, и вам не понадобится вывод ее .communicate().

Ответ 5

Вдохновленный ответом @Cristian. Я встретил ту же проблему, но с другой командой. Поэтому я помещаю свой испытанный пример, который, я считаю, может быть полезным:

grep_proc = subprocess.Popen(["grep", "rabbitmq"],
                             stdin=subprocess.PIPE, 
                             stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()

Это проверено.

Что сделано

  • Объявлено ленивое выполнение grep с помощью stdin from pipe. Эта команда будет исполнена при выполнении команды ps, когда труба будет заполнена выводом ps.
  • Вызывается первичная команда ps с помощью stdout, направленной на канал, используемый командой grep.
  • Grep передал сообщение о выходе из трубы.

Мне нравится этот путь, потому что это естественная концепция трубы, аккуратно обернутая интерфейсами subprocess.

Ответ 6

Принятый ответ обходит проблему. Вот фрагмент кода, который объединяет выходные данные нескольких процессов: Обратите внимание, что он также печатает (несколько) эквивалентную команду оболочки, чтобы вы могли запустить ее и убедиться, что выходные данные верны.

#!/usr/bin/env python3

from subprocess import Popen, PIPE

# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : {' '.join(cmd1)} | {' '.join(cmd2)} | {' '.join(cmd3)}")

p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)

print("Output from last process : " + (p3.communicate()[0]).decode())

# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)

Ответ 7

РЕДАКТИРОВАТЬ: pipes доступен в Windows, но, что крайне важно, фактически не работает в Windows. См. Комментарии ниже.

Стандартная библиотека Python теперь включает в себя модуль pipes для этого:

https://docs.python.org/2/library/pipes.html, https://docs.python.org/3.4/library/pipes.html

Я не уверен, как долго этот модуль был вокруг, но этот подход выглядит намного проще, чем с помощью subprocess.

Ответ 8

Предыдущие ответы пропустили важный момент. Замена оболочки оболочки в основном правильна, как указано геокаром. Достаточно запустить communicate на последнем элементе трубы.

Остальная проблема заключается в передаче входных данных в конвейер. При использовании нескольких подпроцессов простой communicate(input_data) на последнем элементе не работает - он висит навсегда. Вам необходимо создать конвейер и дочерний элемент вручную следующим образом:

import os
import subprocess

input = """\
input data
more input
""" * 10

rd, wr = os.pipe()
if os.fork() != 0: # parent
    os.close(wr)
else:              # child
    os.close(rd)
    os.write(wr, input)
    os.close(wr)
    exit()

p_awk = subprocess.Popen(["awk", "{ print $2; }"],
                         stdin=rd,
                         stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"], 
                          stdin=p_awk.stdout,
                          stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())

Теперь дочерний элемент предоставляет вход через канал, а родительские вызовы обмениваются(), который работает как ожидалось. При таком подходе вы можете создавать произвольные длинные конвейеры, не прибегая к "делегированию части работы оболочке". К сожалению, подпроцессная документация не упоминает об этом.

Существуют способы достижения такого же эффекта без труб:

from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)

Теперь используйте stdin=tf для p_awk. Это вопрос вкуса, который вы предпочитаете.

Вышеописанное по-прежнему не эквивалентно 100% конвейерам bash, потому что обработка сигналов отличается. Вы можете увидеть это, если вы добавите еще один элемент pipe, который усекает вывод sort, например. head -n 10. С помощью приведенного выше кода sort выведет сообщение об ошибке "Сломанная труба" на stderr. Вы не увидите это сообщение, когда вы запустите тот же конвейер в оболочке. (Единственное отличие, однако, результат в stdout тот же). Причина в том, что python Popen устанавливает SIG_IGN для SIGPIPE, тогда как оболочка оставляет его в SIG_DFL, а обработка сигналов sort отличается в этих двух случаях.

Ответ 9

Для меня приведенный ниже подход является самым чистым и легким для чтения

from subprocess import Popen, PIPE

def string_to_2_procs_to_file(input_s, first_cmd, second_cmd, output_filename):
    with open(output_filename, 'wb') as out_f:
        p2 = Popen(second_cmd, stdin=PIPE, stdout=out_f)
        p1 = Popen(first_cmd, stdout=p2.stdin, stdin=PIPE)
        p1.communicate(input=bytes(input_s))
        p1.wait()
        p2.stdin.close()
        p2.wait()

который можно назвать так:

string_to_2_procs_to_file('input data', ['awk', '-f', 'script.awk'], ['sort'], 'output.txt')