Обрабатывать большое количество данных для stdin при использовании подпроцесса. Popen
Я как бы пытаюсь понять, что такое python для решения этой простой проблемы.
Моя проблема довольно проста. Если вы используете следующий код, он будет висеть. Это хорошо документировано в документе subprocess doc.
import subprocess
proc = subprocess.Popen(['cat','-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
for i in range(100000):
proc.stdin.write('%d\n' % i)
output = proc.communicate()[0]
print output
Поиск решения (есть очень проницательный поток, но я потерял его сейчас). Я нашел это решение (среди прочих), которое использует явную fork:
import os
import sys
from subprocess import Popen, PIPE
def produce(to_sed):
for i in range(100000):
to_sed.write("%d\n" % i)
to_sed.flush()
#this would happen implicitly, anyway, but is here for the example
to_sed.close()
def consume(from_sed):
while 1:
res = from_sed.readline()
if not res:
sys.exit(0)
#sys.exit(proc.poll())
print 'received: ', [res]
def main():
proc = Popen(['cat','-'],stdin=PIPE,stdout=PIPE)
to_sed = proc.stdin
from_sed = proc.stdout
pid = os.fork()
if pid == 0 :
from_sed.close()
produce(to_sed)
return
else :
to_sed.close()
consume(from_sed)
if __name__ == '__main__':
main()
В то время как это решение концептуально очень легко понять, оно использует еще один процесс и задерживается как слишком низкий уровень по сравнению с модулем подпроцесса (это просто для того, чтобы скрыть такие вещи...).
Мне интересно: существует ли простое и чистое решение с использованием модуля подпроцесса, который не будет висели или для реализации этого теста. Мне нужно сделать шаг назад и реализовать цикл выбора в старом стиле или явный fork?
Спасибо
Ответы
Ответ 1
Если вы хотите чистое решение Python, вам нужно поместить либо читателя, либо автора в отдельный поток. Пакет threading
- это легкий способ сделать это, с удобным доступом к обычным объектам и бесполезным форкированием.
import subprocess
import threading
import sys
proc = subprocess.Popen(['cat','-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
def writer():
for i in range(100000):
proc.stdin.write('%d\n' % i)
proc.stdin.close()
thread = threading.Thread(target=writer)
thread.start()
for line in proc.stdout:
sys.stdout.write(line)
thread.join()
proc.wait()
Может показаться, что модуль subprocess
модернизирован для поддержки потоков и сопрограмм, что позволит конструировать трубопроводы, которые смешивают кусочки Python и куски оболочки, более элегантно.
Ответ 2
Если вы не хотите хранить все данные в памяти, вам нужно использовать select. Например. что-то вроде:
import subprocess
from select import select
import os
proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
i = 0;
while True:
rlist, wlist, xlist = [proc.stdout], [], []
if i < 100000:
wlist.append(proc.stdin)
rlist, wlist, xlist = select(rlist, wlist, xlist)
if proc.stdout in rlist:
out = os.read(proc.stdout.fileno(), 10)
print out,
if not out:
break
if proc.stdin in wlist:
proc.stdin.write('%d\n' % i)
i += 1
if i >= 100000:
proc.stdin.close()
Ответ 3
Здесь что-то я использовал для загрузки загружаемых файлов mysql 6G через подпроцесс. Держитесь подальше от оболочки = True. Небезопасно и начать с ресурсов, тратящих ресурсы.
import subprocess
fhandle = None
cmd = [mysql_path,
"-u", mysql_user, "-p" + mysql_pass],
"-h", host, database]
fhandle = open(dump_file, 'r')
p = subprocess.Popen(cmd, stdin=fhandle, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout,stderr) = p.communicate()
fhandle.close()
Ответ 4
Для такого рода вещей оболочка работает намного лучше, чем подпроцесс.
Напишите очень простые приложения Python, которые читаются с sys.stdin
и пишут sys.stdout
.
Соедините простые приложения вместе с конвейером оболочки.
Если вы хотите, запустите конвейер с помощью subprocess
или просто напишите однострочную оболочку script.
python part1.py | python part2.py
Это очень, очень эффективно. Он также переносится на все Linux (и Windows), пока вы держите его очень просто.
Ответ 5
Ваши блокировки кода, как только cat
stdout OS buffer buffer заполнен. Если вы используете stdout=PIPE
; вы должны использовать его вовремя, иначе может возникнуть тупик, как в вашем случае.
Если вам не нужен вывод во время выполнения процесса; вы можете перенаправить его во временный файл:
#!/usr/bin/env python3
import subprocess
import tempfile
with tempfile.TemporaryFile('r+') as output_file:
with subprocess.Popen(['cat'],
stdin=subprocess.PIPE,
stdout=output_file,
universal_newlines=True) as process:
for i in range(100000):
print(i, file=process.stdin)
output_file.seek(0) # rewind (and sync with the disk)
print(output_file.readline(), end='') # get the first line of the output
Если вход/выход мал (поместится в память); вы можете передать вход все сразу и получить вывод сразу, используя .communicate()
, который одновременно читает/записывает:
#!/usr/bin/env python3
import subprocess
cp = subprocess.run(['cat'], input='\n'.join(['%d' % i for i in range(100000)]),
stdout=subprocess.PIPE, universal_newlines=True)
print(cp.stdout.splitlines()[-1]) # print the last line
Чтобы читать/писать одновременно вручную, вы можете использовать потоки, asyncio, fcntl и т.д. @Jed предоставил простой поток-решение. Здесь решение на основе asyncio
:
#!/usr/bin/env python3
import asyncio
import sys
from subprocess import PIPE
async def pump_input(writer):
try:
for i in range(100000):
writer.write(b'%d\n' % i)
await writer.drain()
finally:
writer.close()
async def run():
# start child process
# NOTE: universal_newlines parameter is not supported
process = await asyncio.create_subprocess_exec('cat', stdin=PIPE, stdout=PIPE)
asyncio.ensure_future(pump_input(process.stdin)) # write input
async for line in process.stdout: # consume output
print(int(line)**2) # print squares
return await process.wait() # wait for the child process to exit
if sys.platform.startswith('win'):
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
В Unix вы можете использовать решение на основе fcntl
:
#!/usr/bin/env python3
import sys
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK
from shutil import copyfileobj
from subprocess import Popen, PIPE, _PIPE_BUF as PIPE_BUF
def make_blocking(pipe, blocking=True):
fd = pipe.fileno()
if not blocking:
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) # set O_NONBLOCK
else:
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) # clear it
with Popen(['cat'], stdin=PIPE, stdout=PIPE) as process:
make_blocking(process.stdout, blocking=False)
with process.stdin:
for i in range(100000):
#NOTE: the mode is block-buffered (default) and therefore
# `cat` won't see it immidiately
process.stdin.write(b'%d\n' % i)
# a deadblock may happen here with a *blocking* pipe
output = process.stdout.read(PIPE_BUF)
if output is not None:
sys.stdout.buffer.write(output)
# read the rest
make_blocking(process.stdout)
copyfileobj(process.stdout, sys.stdout.buffer)
Ответ 6
Вот пример (Python 3) чтения одной записи за раз из gzip с использованием протокола:
cmd = 'gzip -dc compressed_file.gz'
pipe = Popen(cmd, stdout=PIPE).stdout
for line in pipe:
print(":", line.decode(), end="")
Я знаю, что для этого есть стандартный модуль, это просто пример. Вы можете прочитать весь вывод за один раз (например, обратно-тики оболочки) с использованием метода связи, но, очевидно, вы должны быть осторожны с размером памяти.
Вот пример (снова Python 3) записи записей в программу lp (1) в Linux:
cmd = 'lp -'
proc = Popen(cmd, stdin=PIPE)
proc.communicate(some_data.encode())
Ответ 7
Теперь я знаю, что это не будет полностью удовлетворять вас пуристом, так как вход должен будет поместиться в память, и у вас нет возможности работать в интерактивном режиме с вводом-выводом, но по крайней мере это отлично работает на вашем примере, Метод связи необязательно принимает входные данные в качестве аргумента, и если вы подаете свой процесс таким образом, он будет работать.
import subprocess
proc = subprocess.Popen(['cat','-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
input = "".join('{0:d}\n'.format(i) for i in range(100000))
output = proc.communicate(input)[0]
print output
Что касается более крупной проблемы, вы можете подклассифицировать Popen, переписать __init__
, чтобы принять потоковые объекты в качестве аргументов в stdin, stdout, stderr и переписать метод _communicate
(волосатый для кроссплатформенности, вам нужно это сделать дважды, см. источник subprocess.py), чтобы вызвать read() в потоке stdin и записать() вывод в потоки stdout и stderr. Меня беспокоит такой подход: насколько я знаю, это еще не сделано. Когда очевидные вещи раньше не делались, обычно есть причина (она не работает должным образом), но я не понимаю, почему это не так, кроме того, что потоки должны быть потокобезопасными в Windows.
Ответ 8
Использование aiofiles и asyncio в python 3.5:
Немного сложный, но для записи в stdin требуется только 1024 байта!
import asyncio
import aiofiles
import sys
from os.path import dirname, join, abspath
import subprocess as sb
THIS_DIR = abspath(dirname(__file__))
SAMPLE_FILE = join(THIS_DIR, '../src/hazelnut/tests/stuff/sample.mp4')
DEST_PATH = '/home/vahid/Desktop/sample.mp4'
async def async_file_reader(f, buffer):
async for l in f:
if l:
buffer.append(l)
else:
break
print('reader done')
async def async_file_writer(source_file, target_file):
length = 0
while True:
input_chunk = await source_file.read(1024)
if input_chunk:
length += len(input_chunk)
target_file.write(input_chunk)
await target_file.drain()
else:
target_file.write_eof()
break
print('writer done: %s' % length)
async def main():
dir_name = dirname(DEST_PATH)
remote_cmd = 'ssh localhost mkdir -p %s && cat - > %s' % (dir_name, DEST_PATH)
stdout, stderr = [], []
async with aiofiles.open(SAMPLE_FILE, mode='rb') as f:
cmd = await asyncio.create_subprocess_shell(
remote_cmd,
stdin=sb.PIPE,
stdout=sb.PIPE,
stderr=sb.PIPE,
)
await asyncio.gather(*(
async_file_reader(cmd.stdout, stdout),
async_file_reader(cmd.stderr, stderr),
async_file_writer(f, cmd.stdin)
))
print('EXIT STATUS: %s' % await cmd.wait())
stdout, stderr = '\n'.join(stdout), '\n'.join(stderr)
if stdout:
print(stdout)
if stderr:
print(stderr, file=sys.stderr)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Результат:
writer done: 383631
reader done
reader done
EXIT STATUS: 0
Ответ 9
Простейшее решение, о котором я могу думать:
from subprocess import Popen, PIPE
from threading import Thread
s = map(str,xrange(10000)) # a large string
p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=1)
Thread(target=lambda: any((p.stdin.write(b) for b in s)) or p.stdin.close()).start()
print (p.stdout.read())
Буферизованная версия:
from subprocess import Popen, PIPE
from threading import Thread
s = map(str,xrange(10000)) # a large string
n = 1024 # buffer size
p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=n)
Thread(target=lambda: any((p.stdin.write(c) for c in (s[i:i+n] for i in xrange(0, len(s), n)))) or p.stdin.close()).start()
print (p.stdout.read())
Ответ 10
Я искал примерный код для постепенного итерации процесса по мере того, как этот процесс потребляет свой вход от предоставленного итератора (поэтапно также). В основном:
import string
import random
# That what I consider a very useful function, though didn't
# find any existing implementations.
def process_line_reader(args, stdin_lines):
# args - command to run, same as subprocess.Popen
# stdin_lines - iterable with lines to send to process stdin
# returns - iterable with lines received from process stdout
pass
# Returns iterable over n random strings. n is assumed to be infinity if negative.
# Just an example of function that returns potentially unlimited number of lines.
def random_lines(n, M=8):
while 0 != n:
yield "".join(random.choice(string.letters) for _ in range(M))
if 0 < n:
n -= 1
# That what I consider to be a very convenient use case for
# function proposed above.
def print_many_uniq_numbered_random_lines():
i = 0
for line in process_line_reader(["uniq", "-i"], random_lines(100500 * 9000)):
# Key idea here is that `process_line_reader` will feed random lines into
# `uniq` process stdin as lines are consumed from returned iterable.
print "#%i: %s" % (i, line)
i += 1
Некоторые предлагаемые здесь решения позволяют делать это с помощью потоков (но это не всегда удобно) или с asyncio (что недоступно в Python 2.x). Ниже приведен пример рабочей реализации, которая позволяет это сделать.
import subprocess
import os
import fcntl
import select
class nonblocking_io(object):
def __init__(self, f):
self._fd = -1
if type(f) is int:
self._fd = os.dup(f)
os.close(f)
elif type(f) is file:
self._fd = os.dup(f.fileno())
f.close()
else:
raise TypeError("Only accept file objects or interger file descriptors")
flag = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
return False
def fileno(self):
return self._fd
def close(self):
if 0 <= self._fd:
os.close(self._fd)
self._fd = -1
class nonblocking_line_writer(nonblocking_io):
def __init__(self, f, lines, autoclose=True, buffer_size=16*1024, encoding="utf-8", linesep=os.linesep):
super(nonblocking_line_writer, self).__init__(f)
self._lines = iter(lines)
self._lines_ended = False
self._autoclose = autoclose
self._buffer_size = buffer_size
self._buffer_offset = 0
self._buffer = bytearray()
self._encoding = encoding
self._linesep = bytearray(linesep, encoding)
# Returns False when `lines` iterable is exhausted and all pending data is written
def continue_writing(self):
while True:
if self._buffer_offset < len(self._buffer):
n = os.write(self._fd, self._buffer[self._buffer_offset:])
self._buffer_offset += n
if self._buffer_offset < len(self._buffer):
return True
if self._lines_ended:
if self._autoclose:
self.close()
return False
self._buffer[:] = []
self._buffer_offset = 0
while len(self._buffer) < self._buffer_size:
line = next(self._lines, None)
if line is None:
self._lines_ended = True
break
self._buffer.extend(bytearray(line, self._encoding))
self._buffer.extend(self._linesep)
class nonblocking_line_reader(nonblocking_io):
def __init__(self, f, autoclose=True, buffer_size=16*1024, encoding="utf-8"):
super(nonblocking_line_reader, self).__init__(f)
self._autoclose = autoclose
self._buffer_size = buffer_size
self._encoding = encoding
self._file_ended = False
self._line_part = ""
# Returns (lines, more) tuple, where lines is iterable with lines read and more will
# be set to False after EOF.
def continue_reading(self):
lines = []
while not self._file_ended:
data = os.read(self._fd, self._buffer_size)
if 0 == len(data):
self._file_ended = True
if self._autoclose:
self.close()
if 0 < len(self._line_part):
lines.append(self._line_part.decode(self._encoding))
self._line_part = ""
break
for line in data.splitlines(True):
self._line_part += line
if self._line_part.endswith(("\n", "\r")):
lines.append(self._line_part.decode(self._encoding).rstrip("\n\r"))
self._line_part = ""
if len(data) < self._buffer_size:
break
return (lines, not self._file_ended)
class process_line_reader(object):
def __init__(self, args, stdin_lines):
self._p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self._reader = nonblocking_line_reader(self._p.stdout)
self._writer = nonblocking_line_writer(self._p.stdin, stdin_lines)
self._iterator = self._communicate()
def __iter__(self):
return self._iterator
def __enter__(self):
return self._iterator
def __exit__(self, type, value, traceback):
self.close()
return False
def _communicate(self):
read_set = [self._reader]
write_set = [self._writer]
while read_set or write_set:
try:
rlist, wlist, xlist = select.select(read_set, write_set, [])
except select.error, e:
if e.args[0] == errno.EINTR:
continue
raise
if self._reader in rlist:
stdout_lines, more = self._reader.continue_reading()
for line in stdout_lines:
yield line
if not more:
read_set.remove(self._reader)
if self._writer in wlist:
if not self._writer.continue_writing():
write_set.remove(self._writer)
self.close()
def lines(self):
return self._iterator
def close(self):
if self._iterator is not None:
self._reader.close()
self._writer.close()
self._p.wait()
self._iterator = None