Многопроцессорность Python: как я могу НАДЕЖДАТЬ перенаправление stdout из дочернего процесса?
NB. Я видел Log output of multiprocessing.Process - к сожалению, он не отвечает на этот вопрос.
Я создаю дочерний процесс (в окнах) через многопроцессорную обработку. Я хочу, чтобы все дочерние процессы stdout и stderr были перенаправлены в файл журнала, а не появлялись на консоли. Единственное, что я видел, - это для дочернего процесса установить sys.stdout в файл. Однако это не позволяет перенаправить весь вывод stdout из-за поведения перенаправления stdout в Windows.
Чтобы проиллюстрировать эту проблему, создайте DLL Windows со следующим кодом
#include <iostream>
extern "C"
{
__declspec(dllexport) void writeToStdOut()
{
std::cout << "Writing to STDOUT from test DLL" << std::endl;
}
}
Затем создайте и запустите python script следующим образом, который импортирует эту DLL и вызывает функцию:
from ctypes import *
import sys
print
print "Writing to STDOUT from python, before redirect"
print
sys.stdout = open("stdout_redirect_log.txt", "w")
print "Writing to STDOUT from python, after redirect"
testdll = CDLL("Release/stdout_test.dll")
testdll.writeToStdOut()
Чтобы увидеть то же поведение, что и я, вероятно, необходимо, чтобы DLL была построена против другого времени выполнения C, чем тот, который использует один Python. В моем случае, python построен с Visual Studio 2010, но моя DLL построена с VS 2005.
Поведение, которое я вижу, это то, что консоль показывает:
> stdout_test.py
Writing to STDOUT from python, before redirect
Writing to STDOUT from test DLL
Пока файл stdout_redirect_log.txt заканчивается, содержащий:
Writing to STDOUT from python, after redirect
Другими словами, при установке sys.stdout не удалось перенаправить вывод stdout, сгенерированный DLL. Это неудивительно, учитывая характер базовых API для перенаправления stdout в Windows. Я столкнулся с этой проблемой на уровне native/С++ и никогда не нашел способ надежно перенаправить stdout изнутри процесса. Это должно быть сделано извне.
На самом деле это причина, по которой я запускаю дочерний процесс - это так, что я могу подключаться извне к его трубам и тем самым гарантировать, что я перехватываю весь его вывод. Я могу определенно сделать это, запустив процесс вручную с помощью pywin32, но мне очень хотелось бы иметь возможность использовать возможности многопроцессорности, в частности способность связываться с дочерним процессом через многопроцессорный объект Pipe, чтобы добиться прогресса обновления. Вопрос заключается в том, есть ли способ использовать многопроцессорность для своих объектов IPC и, чтобы надежно перенаправить все дочерние stdout и stderr в файл.
ОБНОВЛЕНИЕ:. Изучая исходный код для многопроцессорности. Процессы, он имеет статический член, _Popen, который выглядит так, как будто он может использоваться для переопределения класса, используемого для создания процесса. Если он установлен на None (по умолчанию), он использует multiprocessing.forking._Popen, но похоже, говоря
multiprocessing.Process._Popen = MyPopenClass
Я мог бы переопределить создание процесса. Однако, хотя я мог бы извлечь это из multiprocessing.forking._Popen, похоже, мне пришлось бы скопировать кучу внутренних вещей в мою реализацию, что звучит шероховато и не очень надежно для будущего. Если это единственный выбор, я думаю, что я, вероятно, мог бы сделать все это вручную с помощью pywin32.
Ответы
Ответ 1
Решение, которое вы предлагаете, является хорошим: создайте свои процессы вручную, чтобы у вас был явный доступ к файлам файлов stdout/stderr. Затем вы можете создать сокет для связи с подпроцессом и использовать multiprocessing.connection через этот сокет (multiprocessing.Pipe создает объект соединения того же типа, поэтому это должно дать вам все те же функции IPC).
Вот пример из двух файлов.
master.py:
import multiprocessing.connection
import subprocess
import socket
import sys, os
## Listen for connection from remote process (and find free port number)
port = 10000
while True:
try:
l = multiprocessing.connection.Listener(('localhost', int(port)), authkey="secret")
break
except socket.error as ex:
if ex.errno != 98:
raise
port += 1 ## if errno==98, then port is not available.
proc = subprocess.Popen((sys.executable, "subproc.py", str(port)), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
## open connection for remote process
conn = l.accept()
conn.send([1, "asd", None])
print(proc.stdout.readline())
subproc.py:
import multiprocessing.connection
import subprocess
import sys, os, time
port = int(sys.argv[1])
conn = multiprocessing.connection.Client(('localhost', port), authkey="secret")
while True:
try:
obj = conn.recv()
print("received: %s\n" % str(obj))
sys.stdout.flush()
except EOFError: ## connection closed
break
Вы также можете увидеть первый ответ на этот вопрос, чтобы получить неблокирующие чтения из подпроцесса.
Ответ 2
Я не думаю, что у вас есть лучший вариант, чем перенаправление подпроцесса в файл, как вы упомянули в своем комментарии.
Способ создания stdin/out/err работает в окнах - каждый процесс, когда он рождается, имеет std handles. Вы можете изменить их с помощью SetStdHandle. Когда вы изменяете python sys.stdout
, вы изменяете только то, где python печатает материал, а не где другие DLL печатают материал. Часть CRT в вашей DLL использует GetStdHandle, чтобы узнать, где печатать. Если вы хотите, вы можете делать все, что хотите, в API Windows в своей DLL или в вашем python script с pywin32. Хотя я думаю, что это будет проще с subprocess.
Ответ 3
Вы сталкиваетесь с проблемой, потому что процесс блокируется?
Посмотрите этот, он использует подпроцесс, но должен создать способ вокруг него, чтобы сделать его неблокирующим. Я предполагаю, что один и тот же трюк может работать с многопроцессорной обработкой.
Ответ 4
Я предполагаю, что я ухожу от базы и чего-то не хватает, но то, что стоит здесь, это то, что пришло мне в голову, когда я прочитал ваш вопрос.
Если вы можете перехватить все stdout и stderr (я получил это впечатление от вашего вопроса), почему бы вам не добавлять или не переносить эту функцию захвата вокруг каждого из ваших процессов? Затем отправляйте то, что захвачено через очередь, для потребителя, который может делать все, что угодно, со всеми выходами?
Ответ 5
В моей ситуации я изменил sys.stdout.write
для записи в PySide QTextEdit. Я не мог читать из sys.stdout
и я не знал, как изменить sys.stdout
чтобы сделать его читаемым. Я создал две трубы. Один для stdout, а другой для stderr. В отдельном процессе я перенаправляю sys.stdout
и sys.stderr
к дочернему соединению многопроцессорного канала. В основном процессе я создал два потока для чтения родительского канала stdout и stderr и перенаправления данных канала в sys.stdout
и sys.stderr
.
import sys
import contextlib
import threading
import multiprocessing as mp
import multiprocessing.queues
from queue import Empty
import time
class PipeProcess(mp.Process):
"""Process to pipe the output of the sub process and redirect it to this sys.stdout and sys.stderr.
Note:
The use_queue = True argument will pass data between processes using Queues instead of Pipes. Queues will
give you the full output and read all of the data from the Queue. A pipe is more efficient, but may not
redirect all of the output back to the main process.
"""
def __init__(self, group=None, target=None, name=None, args=tuple(), kwargs={}, *_, daemon=None,
use_pipe=None, use_queue=None):
self.read_out_th = None
self.read_err_th = None
self.pipe_target = target
self.pipe_alive = mp.Event()
if use_pipe or (use_pipe is None and not use_queue): # Default
self.parent_stdout, self.child_stdout = mp.Pipe(False)
self.parent_stderr, self.child_stderr = mp.Pipe(False)
else:
self.parent_stdout = self.child_stdout = mp.Queue()
self.parent_stderr = self.child_stderr = mp.Queue()
args = (self.child_stdout, self.child_stderr, target) + tuple(args)
target = self.run_pipe_out_target
super(PipeProcess, self).__init__(group=group, target=target, name=name, args=args, kwargs=kwargs,
daemon=daemon)
def start(self):
"""Start the multiprocess and reading thread."""
self.pipe_alive.set()
super(PipeProcess, self).start()
self.read_out_th = threading.Thread(target=self.read_pipe_out,
args=(self.pipe_alive, self.parent_stdout, sys.stdout))
self.read_err_th = threading.Thread(target=self.read_pipe_out,
args=(self.pipe_alive, self.parent_stderr, sys.stderr))
self.read_out_th.daemon = True
self.read_err_th.daemon = True
self.read_out_th.start()
self.read_err_th.start()
@classmethod
def run_pipe_out_target(cls, pipe_stdout, pipe_stderr, pipe_target, *args, **kwargs):
"""The real multiprocessing target to redirect stdout and stderr to a pipe or queue."""
sys.stdout.write = cls.redirect_write(pipe_stdout) # , sys.__stdout__) # Is redirected in main process
sys.stderr.write = cls.redirect_write(pipe_stderr) # , sys.__stderr__) # Is redirected in main process
pipe_target(*args, **kwargs)
@staticmethod
def redirect_write(child, out=None):
"""Create a function to write out a pipe and write out an additional out."""
if isinstance(child, mp.queues.Queue):
send = child.put
else:
send = child.send_bytes # No need to pickle with child_conn.send(data)
def write(data, *args):
try:
if isinstance(data, str):
data = data.encode('utf-8')
send(data)
if out is not None:
out.write(data)
except:
pass
return write
@classmethod
def read_pipe_out(cls, pipe_alive, pipe_out, out):
if isinstance(pipe_out, mp.queues.Queue):
# Queue has better functionality to get all of the data
def recv():
return pipe_out.get(timeout=0.5)
def is_alive():
return pipe_alive.is_set() or pipe_out.qsize() > 0
else:
# Pipe is more efficient
recv = pipe_out.recv_bytes # No need to unpickle with data = pipe_out.recv()
is_alive = pipe_alive.is_set
# Loop through reading and redirecting data
while is_alive():
try:
data = recv()
if isinstance(data, bytes):
data = data.decode('utf-8')
out.write(data)
except EOFError:
break
except Empty:
pass
except:
pass
def join(self, *args):
# Wait for process to finish (unless a timeout was given)
super(PipeProcess, self).join(*args)
# Trigger to stop the threads
self.pipe_alive.clear()
# Pipe must close to prevent blocking and waiting on recv forever
if not isinstance(self.parent_stdout, mp.queues.Queue):
with contextlib.suppress():
self.parent_stdout.close()
with contextlib.suppress():
self.parent_stderr.close()
# Close the pipes and threads
with contextlib.suppress():
self.read_out_th.join()
with contextlib.suppress():
self.read_err_th.join()
def run_long_print():
for i in range(1000):
print(i)
print(i, file=sys.stderr)
print('finished')
if __name__ == '__main__':
# Example test write (My case was a QTextEdit)
out = open('stdout.log', 'w')
err = open('stderr.log', 'w')
# Overwrite the write function and not the actual stdout object to prove this works
sys.stdout.write = out.write
sys.stderr.write = err.write
# Create a process that uses pipes to read multiprocess output back into sys.stdout.write
proc = PipeProcess(target=run_long_print, use_queue=True) # If use_pipe=True Pipe may not write out all values
# proc.daemon = True # If daemon and use_queue Not all output may be redirected to stdout
proc.start()
# time.sleep(5) # Not needed unless use_pipe or daemon and all of stdout/stderr is desired
# Close the process
proc.join() # For some odd reason this blocks forever when use_queue=False
# Close the output files for this test
out.close()
err.close()