Я пытаюсь осознать многопоточность и научиться её реализовывать в приложение PyQt. Нашёл, понятное дело, пример, начал в нём разбираться, но что-то не выходит:
import sys import traceback from PyQt5.QtCore import (QCoreApplication, QObject, QRunnable, QThread, QThreadPool, pyqtSignal, pyqtSlot) from PyQt5 import Qt # Если при ошибке в слотах приложение просто падает без стека, # есть хороший способ ловить такие ошибки: def log_uncaught_exceptions(ex_cls, ex, tb): text = '{}: {}:\n'.format(ex_cls.__name__, ex) #import traceback text += ''.join(traceback.format_tb(tb)) print(text) Qt.QMessageBox.critical(None, 'Error', text) quit() sys.excepthook = log_uncaught_exceptions class WorkerSignals(QObject): ''' Определяет сигналы, доступные из рабочего рабочего потока Worker(QRunnable).''' finish = pyqtSignal() error = pyqtSignal(tuple) result = pyqtSignal(object) progress = pyqtSignal(int) class Worker(QRunnable): ''' Наследует от QRunnable, настройки рабочего потока обработчика, сигналов и wrap-up. ''' def __init__(self, fn, *args, **kwargs): super(Worker, self).__init__() # Хранить аргументы конструктора (повторно используемые для обработки) self.fn = fn self.args = args self.kwargs = kwargs self.signals = WorkerSignals() print("\nfn=`{}`, \nargs=`{}`, kwargs=`{}`, \nself.signals=`{}`"\ .format(fn, args, kwargs, self.signals)) #== Добавьте обратный вызов в наши kwargs ====================================### kwargs['progress_callback'] = self.signals.progress print("kwargs['progress_callback']->`{}`\n".format(kwargs['progress_callback'])) @pyqtSlot() def run(self): # Получите args/kwargs здесь; и обработка с их использованием try: # выполняем метод `execute_this_fn` переданный из Main result = self.fn(*self.args, **self.kwargs) except: traceback.print_exc() exctype, value = sys.exc_info()[:2] self.signals.error.emit((exctype, value, traceback.format_exc())) else: # если ошибок не была, испускаем сигнал .result и передаем результат `result` self.signals.result.emit(result) # Вернуть результат обработки finally: self.signals.finish.emit() # Done / Готово # Подклассификация QThread # http://qt-project.org/doc/latest/qthread.html class AThread(QThread): threadSignalAThread = pyqtSignal(int) def __init__(self): super().__init__() def run(self): count = 0 while count < 1000: #time.sleep(1) Qt.QThread.msleep(200) count += 1 self.threadSignalAThread.emit(count) # Подкласс QObject и использование moveToThread class SomeObject(QObject): finishedSomeObject = pyqtSignal() threadSignalSomeObject = pyqtSignal(int) def long_running(self): print('SomeObject(QObject) id', int(QThread.currentThreadId())) count = 0 while count < 150: Qt.QThread.msleep(100) count += 1 self.threadSignalSomeObject.emit(count) self.finishedSomeObject.emit() class MsgBoxAThread(Qt.QDialog): """ Класс инициализации окна для визуализации дополнительного потока и кнопка для закрытия потокового окна, если поток остановлен! """ def __init__(self): super().__init__() layout = Qt.QVBoxLayout(self) self.label = Qt.QLabel("") layout.addWidget(self.label) close_btn = Qt.QPushButton("Close Окно") layout.addWidget(close_btn) # ------- Сигнал это только закроет окно, поток как работал, так и работает close_btn.clicked.connect(self.close) self.setGeometry(900, 65, 400, 80) self.setWindowTitle('MsgBox AThread(QThread)') class MsgBoxSomeObject(Qt.QDialog): def __init__(self): super().__init__() layout = Qt.QVBoxLayout(self) self.label = Qt.QLabel("") layout.addWidget(self.label) close_btn = Qt.QPushButton("Close Окно") layout.addWidget(close_btn) # ------- Сигнал это только закроет окно, поток как работал, так и работает close_btn.clicked.connect(self.close) self.setGeometry(900, 185, 400, 80) self.setWindowTitle('MsgBox SomeObject(QObject)') class MsgBoxWorker(Qt.QDialog): def __init__(self): super().__init__() layout = Qt.QVBoxLayout(self) self.label = Qt.QLabel("") layout.addWidget(self.label) close_btn = Qt.QPushButton("Close Окно") layout.addWidget(close_btn) # ------- Сигнал это только закроет окно, поток как работал, так и работает close_btn.clicked.connect(self.close) self.setGeometry(900, 300, 400, 80) self.setWindowTitle('MsgBox Worker(QRunnable)') class ExampleThread(Qt.QWidget): def __init__(self, parent=None): super(ExampleThread, self).__init__(parent) layout = Qt.QVBoxLayout(self) self.lbl = Qt.QLabel("Start") layout.addWidget(self.lbl) self.btnA = Qt.QPushButton("Запустить AThread(QThread)") layout.addWidget(self.btnA) self.btnB = Qt.QPushButton("Запустить SomeObject(QObject)") layout.addWidget(self.btnB) self.btnC = Qt.QPushButton("Запустить Worker(QRunnable)") layout.addWidget(self.btnC) self.progressBar = Qt.QProgressBar() self.progressBar.setProperty("value", 0) layout.addWidget(self.progressBar) self.setGeometry(550, 65, 300, 300) self.setWindowTitle('3 разных и простых способа работы с потоками.') self.btnA.clicked.connect(self.using_q_thread) self.btnB.clicked.connect(self.using_move_to_thread) self.btnC.clicked.connect(self.using_q_runnable) self.msg = MsgBoxAThread() self.thread = None self.msgSomeObject = MsgBoxSomeObject() self.objThread = None self.counter = 0 self.timer = Qt.QTimer() self.timer.setInterval(1000) # -------- timeout -------> def recurring_timer(self): self.timer.timeout.connect(self.recurring_timer) self.timer.start() self.threadpool = QThreadPool() print("Max потоков, кот. будут использоваться=`%d`" % self.threadpool.maxThreadCount()) self.msgWorker = MsgBoxWorker() self.threadtest = QThread(self) self.idealthreadcount = self.threadtest.idealThreadCount() print("Ваша машина может обрабатывать `{}` потокa оптимально.".format(self.idealthreadcount)) def recurring_timer(self): self.counter += 1 self.lbl.setText("СЧЁТЧИК цикл GUI: %d" % self.counter) # ---- AThread(QThread) -----------# def using_q_thread(self): if self.thread is None: self.thread = AThread() self.thread.threadSignalAThread.connect(self.on_threadSignalAThread) self.thread.finished.connect(self.finishedAThread) self.thread.start() self.btnA.setText("Stop AThread(QThread)") else: self.thread.terminate() self.thread = None self.btnA.setText("Start AThread(QThread)") def finishedAThread(self): self.thread = None self.btnA.setText("Start AThread(QThread)") def on_threadSignalAThread(self, value): self.msg.label.setText(str(value)) # Восстанавливаем визуализацию потокового окна, если его закрыли. Поток работает. # .setVisible(true) или .show() устанавливает виджет в видимое состояние, # если видны все его родительские виджеты до окна. if not self.msg.isVisible(): self.msg.show() # --END-- AThread(QThread) -------------------# # ---- SomeObject(QObject) -------------------# def using_move_to_thread(self): if self.objThread is None: self.objThread = QThread() self.obj = SomeObject() self.obj.moveToThread(self.objThread) # Переместить в поток для выполнения self.obj.threadSignalSomeObject.connect(self.on_threadSignalSomeObject) self.obj.finishedSomeObject.connect(self.finishedSomeObject) self.objThread.started.connect(self.obj.long_running) self.objThread.start() self.btnB.setText("Wait SomeObject(QObject)") self.btnB.setEnabled(False) else: pass def finishedSomeObject(self): self.objThread.terminate() self.objThread.wait(1) self.objThread = None self.btnB.setEnabled(True) self.btnB.setText("Start SomeObject(QObject)") def on_threadSignalSomeObject(self, value): self.msgSomeObject.label.setText(str(value)) # Восстанавливаем визуализацию потокового окна, если его закрыли. Поток работает. if not self.msgSomeObject.isVisible(): self.msgSomeObject.show() # --END-- SomeObject(QObject) -------------------# # ---- Worker(QRunnable) ------------------------# def using_q_runnable(self): # Передайте функцию для выполнения # Любые другие аргументы, kwargs передаются функции run worker = Worker(self.execute_this_fn) worker.signals.result.connect(self.print_output) worker.signals.finish.connect(self.thread_complete) worker.signals.progress.connect(self.progress_fn) self.threadpool.start(worker) def progress_fn(self, n): self.progressBar.setValue(n) self.msgWorker.label.setText(str(n)) # Восстанавливаем визуализацию потокового окна, если его закрыли. Поток работает. if not self.msgWorker.isVisible(): self.msgWorker.show() def execute_this_fn(self, progress_callback): for n in range(0, 11): Qt.QThread.msleep(600) progress_callback.emit(n*100/10) return "Готово." def print_output(self, s): print("\ndef print_output(self, s):", s) def thread_complete(self): print("\nTHREAD ЗАВЕРШЕН!, self->", self) # --END-- Worker QRunnable) -------------------# #==============================================### # потоки или процессы должны быть завершены ### def closeEvent(self, event): reply = Qt.QMessageBox.question\ (self, 'Информация', "Вы уверены, что хотите закрыть приложение?", Qt.QMessageBox.Yes, Qt.QMessageBox.No) if reply == Qt.QMessageBox.Yes: if self.thread: self.thread.quit() del self.thread self.msg.close() if self.objThread: self.objThread.setTerminationEnabled(True) self.objThread.terminate() self.objThread.wait(1) self.msgSomeObject.close() # закрыть поток Worker(QRunnable) self.msgWorker.close() super(ExampleThread, self).closeEvent(event) else: event.ignore() if __name__ == '__main__': app = Qt.QApplication([]) mw = ExampleThread() mw.show() app.exec()
На этой основе я бы хотел создать простое приложение, суть которого в следующем: есть окно с кнопкой. Кнопка позволяет сделать клик и дабл-клик. При клике, если поток не запущен, поток запускается. Там какой-то счётчик и начинается мотать. При повторном одиночном клике счётчик приостанавливается, ставится на паузу. Если ткнуть ещё один раз, то счётчик запускается с места паузы. Если сделать двойной клик, то поток останавливается и счётчик сбрасывается.
Начал писать, копировать.. и вот не могу понять, как сделать (не надо, как в первом примере, всплывающих окон и прочее).
Вот пока остановился на этом:
import sys import traceback from PyQt5.QtCore import (QCoreApplication, QObject, QRunnable, QThread, QThreadPool, pyqtSignal, pyqtSlot) from PyQt5 import Qt class QDoublePushButton(Qt.QPushButton): doubleClicked = pyqtSignal() clicked = pyqtSignal() def __init__(self, *args, **kwargs): Qt.QPushButton.__init__(self, *args, **kwargs) self.timer = Qt.QTimer() self.timer.setSingleShot(True) self.timer.timeout.connect(self.clicked.emit) super().clicked.connect(self.checkDoubleClick) @pyqtSlot() def checkDoubleClick(self): if self.timer.isActive(): self.doubleClicked.emit() self.timer.stop() else: self.timer.start(250) AThread_flag =False class AThread(QThread): threadSignalAThread = pyqtSignal(int) def __init__(self): super().__init__() def run(self, count = 0): global AThread_flag if AThread_flag == True: count = count while count < 1000: #time.sleep(1) Qt.QThread.msleep(200) count += 1 self.threadSignalAThread.emit(count) #yield count class ExampleThread(Qt.QWidget): def __init__(self, parent=None): super(ExampleThread, self).__init__(parent) layout_V = Qt.QVBoxLayout(self) self.lbl = Qt.QLabel("Начало") layout_V.addWidget(self.lbl) layout_H = Qt.QHBoxLayout(self) layout_V.addLayout(layout_H) self.btnStart = QDoublePushButton("Запустить AThread(QThread)") layout_H.addWidget(self.btnStart) self.btnStart.clicked.connect(self.on_click) self.btnStart.doubleClicked.connect(self.on_doubleclick) self.thread = None self.count = 0 global AThread_flag @pyqtSlot() def on_doubleclick(self): print("Doubleclick") @pyqtSlot() def on_click(self): print("Click") if self.thread is None and self.count==0: AThread_flag = True self.thread = AThread() self.thread.threadSignalAThread.connect(self.on_threadSignalAThread) self.thread.finished.connect(self.finishedAThread) self.thread.start(0) self.btnStart.setText("Pause // Stop AThread(QThread)") elif self.thread is not None and self.count != 0 and not(self.thread.isRunning()): self.thread.threadSignalAThread.connect(self.on_threadSignalAThread) self.thread.finished.connect(self.finishedAThread) self.thread.start(self.count) self.btnStart.setText("Pause // Stop AThread(QThread)") elif self.thread is not None and self.count != 0 and self.thread.isRunning(): self.count = self.thread.run() self.btnStart.setText("Proceed AThread(QThread)") else: self.thread.terminate() self.thread = None self.btnStart.setText("Star AThread(QThread)") def using_q_thread(self): if self.thread is not None and self.count != 0: self.thread.terminate() self.thread = None self.btnStart.setText("Запустить AThread(QThread)") def on_threadSignalAThread(self, value): self.lbl.setText(str(value)) # Восстанавливаем визуализацию потокового окна, если его закрыли. Поток работает. # .setVisible(true) или .show() устанавливает виджет в видимое состояние, # если видны все его родительские виджеты до окна. def finishedAThread(self): self.thread = None self.btnStart.setText("Start AThread(QThread)") if __name__ == '__main__': app = Qt.QApplication([]) mw = ExampleThread() mw.show() app.exec()
Буду благодарен за помощь и разъяснение.
Спасибо.
Крос, простите, если что… но очень надо ответ найти(