Многопроцессорные схемы графического интерфейса для борьбы с блокировкой "Неответ"
Каковы наилучшие способы создания многопроцессорной/графической системы кодирования?
Я хотел бы создать место для интернет-сообщества и найти примеры того, как использовать модуль multiprocessing
в python.
Я видел несколько небольших примеров процессов multiprocessing
в Интернете простых глобальных функций, которые вызываются в основном модуле, но я обнаружил, что это редко легко переводится во что-либо, что кто-то действительно делает с точки зрения графического интерфейса. Я думаю, что многие программы будут иметь функции, которые они хотят использовать в отдельном процессе как методы объектов (которые могут быть агрегатами других объектов и т.д.), И, возможно, один элемент GUI будет иметь связанный объект, который должен вызывать это процесс и т.д.
Например, у меня есть относительно сложная программа, и у меня возникают проблемы с получением гибкого графического интерфейса для нее, что, как я полагал, связано с отсутствием понимания в multiprocessing
и потокованием с помощью QThread
. Тем не менее, я знаю, что приведенный ниже пример будет, по крайней мере, передавать информацию между процессами, как мне хочется (из-за возможности выполнять инструкции print
), но мой графический интерфейс по-прежнему блокируется. Кто-нибудь знает, что может быть причиной этого, и если это все еще проблема с моим непониманием в архитектуре с многопотоковой/многопроцессорной обработкой?
Вот небольшой пример псевдокода того, что я делаю:
class Worker:
...
def processing(self, queue):
# put stuff into queue in a loop
# This thread gets data from Worker
class Worker_thread(QThread):
def __init__(self):
...
# make process with Worker inside
def start_processing(self):
# continuously get data from Worker
# send data to Tab object with signals/slots
class Tab(QTabWidget):
# spawn a thread separate from main GUI thread
# update GUI using slot
def update_GUI()
И этот код является полностью компилируемым примером, который воплощает вышележащую структуру моей программы:
from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import numpy as np
import sys
import time
# This object can hold several properties which will be used for the processing
# and will be run in the background, while it updates a thread with all of it progress
class Worker:
def __init__(self, some_var):
self.some_var = some_var
self.iteration = 0
def some_complex_processing(self, queue):
for i in range(0,5000):
self.iteration += 1
queue.put(self.iteration)
queue.put('done with processing')
# This Woker_thread is a thread which will spawn a separate process (Worker).
# This separate is needed in order to separate the data retrieval
# from the main GUI thread, which should only quickly update when needed
class Worker_thread(QtCore.QThread):
# signals and slots are used to communicate back to the main GUI thread
update_signal = QtCore.pyqtSignal(int)
done_signal = QtCore.pyqtSignal()
def __init__(self, parent, worker):
QtCore.QThread.__init__(self, parent)
self.queue = mp.Queue()
self.worker = worker
self.parent = parent
self.process = mp.Process(target=self.worker.some_complex_processing, args=(self.queue,))
# When the process button is pressed, this function will start getting data from Worker
# this data is then retrieved by the queue and pushed through a signal
# to Tab.update_GUI
@QtCore.pyqtSlot()
def start_computation(self):
self.process.start()
while(True):
try:
message = self.queue.get()
self.update_signal.emit(message)
except EOFError:
pass
if message == 'done with processing':
self.done_signal.emit()
break
#self.parent.update_GUI(message)
self.process.join()
return
# Each tab will start it own thread, which will spawn a process
class Tab(QtGui.QTabWidget):
start_comp = QtCore.pyqtSignal()
def __init__(self, parent, this_worker):
self.parent = parent
self.this_worker = this_worker
QtGui.QTabWidget.__init__(self, parent)
self.treeWidget = QtGui.QTreeWidget(self)
self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])
self.thread = Worker_thread(parent=self, worker=self.this_worker)
self.thread.update_signal.connect(self.update_GUI)
self.thread.done_signal.connect(self.thread.quit)
self.start_comp.connect(self.thread.start_computation)
self.thread.start()
###############################
# Here is what should update the GUI at every iteration of Worker.some_complex_processing()
# The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
@QtCore.pyqtSlot(int)
def update_GUI(self, iteration):
self.step.setText(0, str(iteration))
#time.sleep(0.1)
print iteration
def start_signal_emit(self):
self.start_comp.emit()
# GUI stuff
class MainWindow(QtGui.QMainWindow):
def __init__(self, parent = None):
QtGui.QMainWindow.__init__(self)
self.tab_list = []
self.setTabShape(QtGui.QTabWidget.Rounded)
self.centralwidget = QtGui.QWidget(self)
self.top_level_layout = QtGui.QGridLayout(self.centralwidget)
self.tabWidget = QtGui.QTabWidget(self.centralwidget)
self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)
process_button = QtGui.QPushButton("Process")
self.top_level_layout.addWidget(process_button, 0, 1)
QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)
self.setCentralWidget(self.centralwidget)
self.centralwidget.setLayout(self.top_level_layout)
# Make Tabs in loop from button
for i in range(0,10):
name = 'tab' + str(i)
self.tab_list.append(Tab(self.tabWidget, Worker(name)))
self.tabWidget.addTab(self.tab_list[-1], name)
# Do the processing
def process(self):
for tab in self.tab_list:
tab.start_signal_emit()
return
if __name__ == "__main__":
app = QtGui.QApplication([])
win = MainWindow()
win.show()
sys.exit(app.exec_())
Дополнительная информация:
Я пишу программу, с которой я хотел бы создать несколько процессов и постоянно их показывать на протяжении всей их обработки. Я хотел бы, чтобы программа была многопроцессорной, чтобы получить максимальную скорость из программы, насколько это возможно.
В настоящий момент я пытаюсь использовать поток для создания процесса и использования сигналов и слотов для обновления GUI, когда данные непрерывно получаются в очереди. Похоже, что queues
, signals
и slots
работают при использовании операторов print
, но не могут обновить GUI. Если у кого-нибудь есть какие-либо другие предложения относительно того, как я должен структурировать это, чтобы сохранить программу более управляемой, я хотел бы узнать.
РЕДАКТИРОВАТЬ. Я внес коррективы, выдвинутые Мин Лином, с добавлением make Worker
a QObject
, чтобы moveToThread()
работал.
Вот новый код, который у меня есть на данный момент:
from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import numpy as np
import sys
import time
class Worker(QtCore.QObject):
update_signal = QtCore.pyqtSignal(int)
done_signal = QtCore.pyqtSignal()
def __init__(self, some_var):
QtCore.QObject.__init__(self, parent=None)
self.some_var = some_var
self.iteration = 0
self.queue = mp.Queue()
self.process = mp.Process(target=self.some_complex_processing, args=(self.queue,))
def some_complex_processing(self, queue):
for i in range(0,5000):
self.iteration += 1
queue.put(self.iteration)
queue.put('done with processing')
@QtCore.pyqtSlot()
def start_computation(self):
self.process.start()
while(True):
try:
message = self.queue.get()
self.update_signal.emit(message)
except EOFError:
pass
if message == 'done with processing':
self.done_signal.emit()
break
self.process.join()
return
class Tab(QtGui.QTabWidget):
start_comp = QtCore.pyqtSignal()
def __init__(self, parent, this_worker):
self.parent = parent
self.this_worker = this_worker
QtGui.QTabWidget.__init__(self, parent)
self.treeWidget = QtGui.QTreeWidget(self)
self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])
# Use QThread is enough
self.thread = QtCore.QThread();
# Change the thread affinity of worker to self.thread.
self.this_worker.moveToThread(self.thread);
self.this_worker.update_signal.connect(self.update_GUI)
self.this_worker.done_signal.connect(self.thread.quit)
self.start_comp.connect(self.this_worker.start_computation)
self.thread.start()
###############################
# Here is what should update the GUI at every iteration of Worker.some_complex_processing()
# The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
@QtCore.pyqtSlot(int)
def update_GUI(self, iteration):
self.step.setText(0, str(iteration))
#time.sleep(0.1)
print iteration
def start_signal_emit(self):
self.start_comp.emit()
# GUI stuff
class MainWindow(QtGui.QMainWindow):
def __init__(self, parent = None):
QtGui.QMainWindow.__init__(self)
self.tab_list = []
self.setTabShape(QtGui.QTabWidget.Rounded)
self.centralwidget = QtGui.QWidget(self)
self.top_level_layout = QtGui.QGridLayout(self.centralwidget)
self.tabWidget = QtGui.QTabWidget(self.centralwidget)
self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)
process_button = QtGui.QPushButton("Process")
self.top_level_layout.addWidget(process_button, 0, 1)
QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)
self.setCentralWidget(self.centralwidget)
self.centralwidget.setLayout(self.top_level_layout)
# Make Tabs in loop from button
for i in range(0,10):
name = 'tab' + str(i)
self.tab_list.append(Tab(self.tabWidget, Worker(name)))
self.tabWidget.addTab(self.tab_list[-1], name)
# Do the processing
def process(self):
for tab in self.tab_list:
tab.start_signal_emit()
return
if __name__ == "__main__":
app = QtGui.QApplication([])
win = MainWindow()
win.show()
sys.exit(app.exec_())
Благодарим вас за все ответы, я ценю уровень детализации, который каждый из вас включил в описание идеи, которую они считают решением, но, к сожалению, я еще не смог выполнить эти типы процессов, которые работают на объект, к которому они принадлежат, при отображении атрибута объекта в графическом интерфейсе.
Тем не менее, я узнал о приличном количестве этого сообщения, что позволило мне понять, что в настоящее время в потоковой версии есть висящий графический интерфейс, так как функция обновления графического интерфейса слишком велика и требует слишком большой обработки.
Итак, я применил подход QTimer()
к моей многопоточной версии и работает намного лучше! Я бы посоветовал всем, кто сталкивался с подобными проблемами, хотя бы попытаться что-то подобное этому.
Я не знал об этом подходе к решению проблем с обновлением GUI, и теперь это псевдо или временное исправление проблемы, с которой я сталкиваюсь.
Ответы
Ответ 1
Приложение GUI идеально подходит для тестирования материалов, так как легко создавать новые задачи и визуализировать происходящее, поэтому я написал небольшое примерное приложение (Скриншот, код ниже), поскольку я хотел узнать его для себя.
Сначала я использовал аналогичный подход, как и ваш, пытаясь реализовать шаблон Consumer/Producer, и я боролся с фонологическими процессами, делающими бесконечные циклы, чтобы ждать новых рабочих мест и заботиться о связи назад и вперед для себя. Затем я узнал об интерфейсе Pool, после чего я смог заменить весь этот хитрый код всего несколькими строками. Все, что вам нужно, это один пул и несколько обратных вызовов:
#!/usr/bin/env python3
import multiprocessing, time, random, sys
from PySide.QtCore import * # equivalent: from PyQt4.QtCore import *
from PySide.QtGui import * # equivalent: from PyQt4.QtGui import *
def compute(num):
print("worker() started at %d" % num)
random_number = random.randint(1, 6)
if random_number in (2, 4, 6):
raise Exception('Random Exception in _%d' % num)
time.sleep(random_number)
return num
class MainWindow(QMainWindow):
def __init__(self):
QMainWindow.__init__(self)
self.toolBar = self.addToolBar("Toolbar")
self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask))
self.list = QListWidget()
self.setCentralWidget(self.list)
# Pool of Background Processes
self.pool = multiprocessing.Pool(processes=4)
def addTask(self):
num_row = self.list.count()
self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult,
error_callback=self.receiveException)
item = QListWidgetItem("item %d" % num_row)
item.setForeground(Qt.gray)
self.list.addItem(item)
def receiveResult(self, result):
assert isinstance(result, int)
print("end_work(), where result is %s" % result)
self.list.item(result).setForeground(Qt.darkGreen)
def receiveException(self, exception):
error = str(exception)
_pos = error.find('_') + 1
num_row = int(error[_pos:])
item = self.list.item(num_row)
item.setForeground(Qt.darkRed)
item.setText(item.text() + ' Retry...')
self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult,
error_callback=self.receiveException)
if __name__ == '__main__':
app = QApplication(sys.argv)
main_window = MainWindow()
main_window.show()
sys.exit(app.exec_())
Изменить: я сделал еще один пример, используя QTimer вместо Callbacks, периодически проверяя записи в очереди, обновляя QProgressBar:
#!/usr/bin/env python3
import multiprocessing, multiprocessing.pool, time, random, sys
from PySide.QtCore import *
from PySide.QtGui import *
def compute(num_row):
print("worker started at %d" % num_row)
random_number = random.randint(1, 10)
for second in range(random_number):
progress = float(second) / float(random_number) * 100
compute.queue.put((num_row, progress,))
time.sleep(1)
compute.queue.put((num_row, 100))
def pool_init(queue):
# see http://stackoverflow.com/a/3843313/852994
compute.queue = queue
class MainWindow(QMainWindow):
def __init__(self):
QMainWindow.__init__(self)
self.toolBar = self.addToolBar("Toolbar")
self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask))
self.table = QTableWidget()
self.table.verticalHeader().hide()
self.table.setColumnCount(2)
self.setCentralWidget(self.table)
# Pool of Background Processes
self.queue = multiprocessing.Queue()
self.pool = multiprocessing.Pool(processes=4, initializer=pool_init, initargs=(self.queue,))
# Check for progress periodically
self.timer = QTimer()
self.timer.timeout.connect(self.updateProgress)
self.timer.start(2000)
def addTask(self):
num_row = self.table.rowCount()
self.pool.apply_async(func=compute, args=(num_row,))
label = QLabel("Queued")
bar = QProgressBar()
bar.setValue(0)
self.table.setRowCount(num_row + 1)
self.table.setCellWidget(num_row, 0, label)
self.table.setCellWidget(num_row, 1, bar)
def updateProgress(self):
if self.queue.empty(): return
num_row, progress = self.queue.get() # unpack
print("received progress of %s at %s" % (progress, num_row))
label = self.table.cellWidget(num_row, 0)
bar = self.table.cellWidget(num_row, 1)
bar.setValue(progress)
if progress == 100:
label.setText('Finished')
elif label.text() == 'Queued':
label.setText('Downloading')
self.updateProgress() # recursion
if __name__ == '__main__':
app = QApplication(sys.argv)
main_window = MainWindow()
main_window.show()
sys.exit(app.exec_())
Ответ 2
Большое вам спасибо за публикацию этого вопроса и всех участников за их вклад. Он предоставил мне полезные леса для экспериментов с PyQt и многопроцессорной обработкой.
Я начал с второго примера кода, указанного в вопросе. Мои изменения и комментарии:
- В Windows все аргументы
Process.__init__()
должны быть разборчивыми. Вы увидите, что @valmynd делает свою функцию compute
функцией верхнего уровня, именно по этой причине. Это отчасти потому, что многопроцессор будет reimport модулем, содержащим целевую функцию. Чтобы напомнить себе об этом, я пытаюсь включить целевую функцию в свой собственный модуль (и убедиться, что любая информация передается как аргументы с возможностью выбора). Я переместил функцию комплексной обработки в свой собственный модуль под названием workermodule.py
.
- Без достаточной работы в функции комплексной обработки петля заканчивается слишком быстро, чтобы любые изменения могли отображаться в графическом интерфейсе. Поэтому я добавил дополнительную (бесполезную) работу внутри сложной функции обработки. Как отмечено в комментарии, вы можете
time.sleep
, но более приятно осветить все ядра немного.
- Со следующими двумя фрагментами кода я получаю плавный графический интерфейс с постоянными обновлениями итерационного значения, а подпроцессы работают на полной скорости.
- Обратите внимание, что
self.process
может быть создан с двумя очередями в качестве аргументов, один для ввода и один для вывода. Тогда комплексная функция обработки должна периодически проверять входную очередь для данных.
workermodule.py:
import time
def some_complex_processing(queue):
iteration = 0
for i in range(0,5000):
iteration += 1
queue.put(iteration)
#time.sleep(20e-3) # ms
# You could time.sleep here to simulate a
# long-running process, but just to prove
# that we're not cheating, let make this
# process work hard, while the GUI process
# should still have smooth drawing.
for x in range(100000):
y = x
queue.put('done with processing')
mainfile.py:
from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import sys
import workermodule
class Worker(QtCore.QObject):
update_signal = QtCore.pyqtSignal(int)
done_signal = QtCore.pyqtSignal()
def __init__(self, some_var):
QtCore.QObject.__init__(self, parent=None)
self.some_var = some_var
self.queue = mp.Queue()
self.process = mp.Process(
target=workermodule.some_complex_processing,
args=(self.queue,)
)
@QtCore.pyqtSlot()
def start_computation(self):
self.process.start()
while True:
try:
message = self.queue.get()
self.update_signal.emit(message)
except EOFError:
pass
if message == 'done with processing':
self.done_signal.emit()
break
self.process.join()
return
class Tab(QtGui.QTabWidget):
start_comp = QtCore.pyqtSignal()
def __init__(self, parent, this_worker):
self.parent = parent
self.this_worker = this_worker
QtGui.QTabWidget.__init__(self, parent)
self.treeWidget = QtGui.QTreeWidget(self)
self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])
# Use QThread is enough
self.thread = QtCore.QThread();
# Change the thread affinity of worker to self.thread.
self.this_worker.moveToThread(self.thread);
self.this_worker.update_signal.connect(self.update_GUI)
self.this_worker.done_signal.connect(self.thread.quit)
self.start_comp.connect(self.this_worker.start_computation)
self.thread.start()
###############################
# Here is what should update the GUI at every iteration of Worker.some_complex_processing()
# The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
@QtCore.pyqtSlot(int)
def update_GUI(self, iteration):
self.step.setText(0, str(iteration))
print iteration
def start_signal_emit(self):
self.start_comp.emit()
# GUI stuff
class MainWindow(QtGui.QMainWindow):
def __init__(self, parent = None):
QtGui.QMainWindow.__init__(self)
self.tab_list = []
self.setTabShape(QtGui.QTabWidget.Rounded)
self.centralwidget = QtGui.QWidget(self)
self.top_level_layout = QtGui.QGridLayout(self.centralwidget)
self.tabWidget = QtGui.QTabWidget(self.centralwidget)
self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)
process_button = QtGui.QPushButton("Process")
self.top_level_layout.addWidget(process_button, 0, 1)
QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)
self.setCentralWidget(self.centralwidget)
self.centralwidget.setLayout(self.top_level_layout)
# Make Tabs in loop from button
for i in range(0,10):
name = 'tab' + str(i)
self.tab_list.append(Tab(self.tabWidget, Worker(name)))
self.tabWidget.addTab(self.tab_list[-1], name)
# Do the processing
def process(self):
for tab in self.tab_list:
tab.start_signal_emit()
return
if __name__ == "__main__":
app = QtGui.QApplication([])
win = MainWindow()
win.show()
sys.exit(app.exec_())
Ответ 3
Все, что вы сделали в Worker_Thread, должно быть перенесено на Работника.
Qt назначает привязку потоков к каждому из объектов на основе того, где создается объект. Объекты Worker_Thread создаются в основном потоке, поэтому он имеет сродство основного потока. Если сигнал из основного потока подключен к слоту объекта, созданного в основном потоке, слот также будет выполнен в основном потоке. (Независимо от этого QueuedConnection или DirectConnection). И слот блокирует графический интерфейс.
Сделайте это:
class Worker:
update_signal = QtCore.pyqtSignal(int)
done_signal = QtCore.pyqtSignal()
def __init__(self, some_var):
self.some_var = some_var
self.iteration = 0
self.queue = mp.Queue()
self.process = mp.Process(target=self.some_complex_processing, args=(self.queue,))
def some_complex_processing(self, queue):
for i in range(0,5000):
self.iteration += 1
queue.put(self.iteration)
queue.put('done with processing')
@QtCore.pyqtSlot()
def start_computation(self):
self.process.start()
while(True):
try:
message = self.queue.get()
self.update_signal.emit(message)
except EOFError:
pass
if message == 'done with processing':
self.done_signal.emit()
break
self.process.join()
return
class Tab(QtGui.QTabWidget):
start_comp = QtCore.pyqtSignal()
def __init__(self, parent, this_worker):
self.parent = parent
self.this_worker = this_worker
QtGui.QTabWidget.__init__(self, parent)
self.treeWidget = QtGui.QTreeWidget(self)
self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])
# Use QThread is enough
self.thread = QtCore.QThread();
# Change the thread affinity of worker to self.thread.
self.this_worker.moveToThread(self.thread);
self.this_worker.update_signal.connect(self.update_GUI)
self.this_worker.done_signal.connect(self.thread.quit)
self.start_comp.connect(self.this_worker.start_computation)
self.thread.start()
###############################
# Here is what should update the GUI at every iteration of Worker.some_complex_processing()
# The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
@QtCore.pyqtSlot(int)
def update_GUI(self, iteration):
self.step.setText(0, str(iteration))
#time.sleep(0.1)
print iteration
def start_signal_emit(self):
self.start_comp.emit()
Ответ 4
Хорошо, я не знаком с Qt, но я сделал аналогичную работу с Tkinter. Я уверен, что вы работаете в Python Global Interpreter Lock здесь. В частности, вы запускаете свою очередь и свое графическое приложение в том же потоке, поэтому, когда блоки очереди ждут ввода, графический интерфейс также блокируется. Попробуйте запустить app = QtGui.QApplication([])
в своем потоке или процессе. Написание графического интерфейса с очередями всегда бывает сложным, и я обнаружил, что обычно требуется, по крайней мере, еще один слой потока, чем я начинаю ожидать от него.