Как использовать subprocess.Popen для подключения нескольких процессов по каналам?
Как выполнить следующую команду оболочки с помощью модуля Python subprocess
?
echo "input data" | awk -f script.awk | sort > outfile.txt
Входные данные будут поступать из строки, поэтому мне действительно не нужно echo
. У меня так далеко, может ли кто-нибудь объяснить, как я получаю это для прохождения через sort
тоже?
p_awk = subprocess.Popen(["awk","-f","script.awk"],
stdin=subprocess.PIPE,
stdout=file("outfile.txt", "w"))
p_awk.communicate( "input data" )
ОБНОВЛЕНИЕ. Обратите внимание: хотя принятый ответ ниже не отвечает на вопрос, как было задано, я считаю, что С. Лотт прав, и лучше избегать необходимости решать эту проблему в первую очередь
Ответы
Ответ 1
Вы были бы немного счастливее со следующим.
import subprocess
awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )
Делегировать часть работы в оболочку. Пусть он соединяет два процесса с конвейером.
Вы бы очень счастливо переписывали "script.awk" в Python, устраняя awk и конвейер.
Edit. Некоторые из причин предполагать, что awk не помогает.
[Слишком много причин для ответа через комментарии.]
-
Awk добавляет шаг без значимого значения. Нет ничего уникального в обработке awk, которую Python не обрабатывает.
-
Конвейерная обработка из awk для сортировки для больших наборов данных может улучшить прошедшее время обработки. Для кратких наборов данных он не имеет существенной выгоды. Быстрое измерение awk >file ; sort file
и awk | sort
покажет помощь concurrency. С помощью сортировки он редко помогает, потому что сортировка не является сквозным фильтром.
-
Простота обработки "Python для сортировки" (вместо "Python to awk to sort" ) предотвращает точный вид вопросов, задаваемых здесь.
-
Python - в то время как wordier, чем awk, также явно, где awk имеет определенные неявные правила, непрозрачные для новичков, и запутывает неспециалистов.
-
Awk (как и сама оболочка script) добавляет еще один язык программирования. Если все это можно сделать на одном языке (Python), устранение оболочки и программирование awk устраняют два языка программирования, позволяя кому-то сосредоточиться на составляющих значение частях задачи.
Нижняя строка: awk не может добавить значимое значение. В этом случае awk - это чистая стоимость; он добавил достаточно сложности, что необходимо было задать этот вопрос. Удаление awk будет чистой прибылью.
Боковая панель Почему создание конвейера (a | b
) настолько сложно.
Когда оболочка сталкивается с a | b
, она должна выполнить следующее.
-
Выполните дочерний процесс исходной оболочки. Это в конечном итоге станет b.
-
Создайте трубку os. (а не подпроцесс Python.PIPE), но вызовите os.pipe()
, который возвращает два новых дескриптора файла, которые связаны через общий буфер. На этом этапе процесс имеет stdin, stdout, stderr от родителя, а также файл, который будет "stdout" и "b stdin".
-
Приведи ребенка. Ребенок заменяет его stdout новым stdout. Exec процесс a
.
-
Закрытие b файла заменяет его stdin новым b stdin. Exec процесс b
.
-
b ребенок ждет завершения.
-
Родитель ждет завершения b.
Я думаю, что приведенное выше можно использовать рекурсивно для появления a | b | c
, но вы должны неявно заключать в скобки длинные конвейеры, рассматривая их так, как будто они a | (b | c)
.
Так как Python имеет os.pipe()
, os.exec()
и os.fork()
, и вы можете заменить sys.stdin
и sys.stdout
, то есть способ сделать это в чистом Python. В самом деле, вы можете выработать несколько ярлыков, используя os.pipe()
и subprocess.Popen
.
Однако проще делегировать эту операцию оболочке.
Ответ 2
import subprocess
some_string = b'input_data'
sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in,
stdin=subprocess.PIPE).communicate(some_string)
Ответ 3
Чтобы эмулировать конвейер оболочки:
from subprocess import check_call
check_call('echo "input data" | a | b > outfile.txt', shell=True)
без вызова оболочки (см. 17.1.4.2. Замена оболочки оболочки):
#!/usr/bin/env python
from subprocess import Popen, PIPE
a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
with a.stdout, open("outfile.txt", "wb") as outfile:
b = Popen(["b"], stdin=a.stdout, stdout=outfile)
a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already
plumbum
содержит некоторый синтаксический сахар:
#!/usr/bin/env python
from plumbum.cmd import a, b # magic
(a << "input data" | b > "outfile.txt")()
Аналог:
#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt
является:
#!/usr/bin/env python
from plumbum.cmd import awk, sort
(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()
Ответ 4
http://www.python.org/doc/2.5.2/lib/node535.html обрисовал это довольно хорошо. Есть ли какая-то часть этого, которую вы не понимаете?
Ваша программа будет довольно похожей, но вторая Popen
будет иметь stdout = в файл, и вам не понадобится вывод ее .communicate()
.
Ответ 5
Вдохновленный ответом @Cristian. Я встретил ту же проблему, но с другой командой. Поэтому я помещаю свой испытанный пример, который, я считаю, может быть полезным:
grep_proc = subprocess.Popen(["grep", "rabbitmq"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()
Это проверено.
Что сделано
- Объявлено ленивое выполнение
grep
с помощью stdin from pipe. Эта команда будет исполнена при выполнении команды ps
, когда труба будет заполнена выводом ps
.
- Вызывается первичная команда
ps
с помощью stdout, направленной на канал, используемый командой grep
.
- Grep передал сообщение о выходе из трубы.
Мне нравится этот путь, потому что это естественная концепция трубы, аккуратно обернутая интерфейсами subprocess
.
Ответ 6
Принятый ответ обходит проблему. Вот фрагмент кода, который объединяет выходные данные нескольких процессов: Обратите внимание, что он также печатает (несколько) эквивалентную команду оболочки, чтобы вы могли запустить ее и убедиться, что выходные данные верны.
#!/usr/bin/env python3
from subprocess import Popen, PIPE
# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : {' '.join(cmd1)} | {' '.join(cmd2)} | {' '.join(cmd3)}")
p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)
print("Output from last process : " + (p3.communicate()[0]).decode())
# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)
Ответ 7
РЕДАКТИРОВАТЬ: pipes
доступен в Windows, но, что крайне важно, фактически не работает в Windows. См. Комментарии ниже.
Стандартная библиотека Python теперь включает в себя модуль pipes
для этого:
https://docs.python.org/2/library/pipes.html, https://docs.python.org/3.4/library/pipes.html
Я не уверен, как долго этот модуль был вокруг, но этот подход выглядит намного проще, чем с помощью subprocess
.
Ответ 8
Предыдущие ответы пропустили важный момент. Замена оболочки оболочки в основном правильна, как указано геокаром. Достаточно запустить communicate
на последнем элементе трубы.
Остальная проблема заключается в передаче входных данных в конвейер. При использовании нескольких подпроцессов простой communicate(input_data)
на последнем элементе не работает - он висит навсегда. Вам необходимо создать конвейер и дочерний элемент вручную следующим образом:
import os
import subprocess
input = """\
input data
more input
""" * 10
rd, wr = os.pipe()
if os.fork() != 0: # parent
os.close(wr)
else: # child
os.close(rd)
os.write(wr, input)
os.close(wr)
exit()
p_awk = subprocess.Popen(["awk", "{ print $2; }"],
stdin=rd,
stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"],
stdin=p_awk.stdout,
stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())
Теперь дочерний элемент предоставляет вход через канал, а родительские вызовы обмениваются(), который работает как ожидалось. При таком подходе вы можете создавать произвольные длинные конвейеры, не прибегая к "делегированию части работы оболочке". К сожалению, подпроцессная документация не упоминает об этом.
Существуют способы достижения такого же эффекта без труб:
from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)
Теперь используйте stdin=tf
для p_awk
. Это вопрос вкуса, который вы предпочитаете.
Вышеописанное по-прежнему не эквивалентно 100% конвейерам bash, потому что обработка сигналов отличается. Вы можете увидеть это, если вы добавите еще один элемент pipe, который усекает вывод sort
, например. head -n 10
. С помощью приведенного выше кода sort
выведет сообщение об ошибке "Сломанная труба" на stderr
. Вы не увидите это сообщение, когда вы запустите тот же конвейер в оболочке. (Единственное отличие, однако, результат в stdout
тот же). Причина в том, что python Popen
устанавливает SIG_IGN
для SIGPIPE
, тогда как оболочка оставляет его в SIG_DFL
, а обработка сигналов sort
отличается в этих двух случаях.
Ответ 9
Для меня приведенный ниже подход является самым чистым и легким для чтения
from subprocess import Popen, PIPE
def string_to_2_procs_to_file(input_s, first_cmd, second_cmd, output_filename):
with open(output_filename, 'wb') as out_f:
p2 = Popen(second_cmd, stdin=PIPE, stdout=out_f)
p1 = Popen(first_cmd, stdout=p2.stdin, stdin=PIPE)
p1.communicate(input=bytes(input_s))
p1.wait()
p2.stdin.close()
p2.wait()
который можно назвать так:
string_to_2_procs_to_file('input data', ['awk', '-f', 'script.awk'], ['sort'], 'output.txt')