это 2 потока. Первый это таймаут, второй - приём передача данных
class TimeOutThreadClass(QThread):
time_out_signal = pyqtSignal(bytes)
def __init__(self, parent=None):
super(TimeOutThreadClass, self).__init__(parent)
def run(self):
self.sleep(2)
time_error = [0xAA, 0x00, 0x00, 0x00, 0x00, 0x00, 0xE1]
self.time_out_signal.emit(bytes(time_error))
class SerialThreadClass(QThread):
rx_signal = pyqtSignal(bytes)
avaliable_comport_list = ['']
def __init__(self, port, parent=None):
super(SerialThreadClass, self).__init__(parent)
# open the serial port
self.serport = QSerialPort()
self.serport.setBaudRate(14400)
self.serport.setPortName(port)
self.serport.open(QIODevice.ReadWrite)
def select_port(self):
self.my_ports = QSerialPortInfo()
ports = self.my_ports.availablePorts()
for port in ports:
port_name = port.portName()
SerialThreadClass.avaliable_comport_list.append(port_name)
print(SerialThreadClass.avaliable_comport_list)
return SerialThreadClass.avaliable_comport_list
def run(self):
while True:
rx_data = self.serport.readLine()
if len(rx_data) > 0:
self.rx_signal.emit(bytes(rx_data))
self.msleep(5)
def senddata(self, data):
tx_data = bytes(data)
self.serport.write(tx_data)
после выбора нужного порта запускаем
[code python]
def list_comport(self):
port = self.comboBox.currentText()
self.my_thread = SerialThreadClass(port)
self.time_out_thread = TimeOutThreadClass()
if self.my_thread.serport.isOpen() is True:
self.textBrowser.append('порт {} открыт'.format(str(port)))
self.my_thread.rx_signal.connect(self.on_change, QtCore.Qt.QueuedConnection)
self.time_out_thread.time_out_signal.connect(self.on_change, QtCore.Qt.QueuedConnection)
self.my_thread.start()
[/code]
по нажатию кнопки;
def btn_init(self):
my_list = [0xAA, 0xB0, 0x00, 0xCD, 0x00, 0x00, 0x00]
self.my_thread.senddata(my_list)
self.time_out_thread.start()
после приёма данных проверяем на ошибки
def check_error_rx(self, data):
if len(data) != 7:
text_error = 'ошибка передачи данных'
[b] self.time_out_thread.terminate()[/b]
return text_error
if data[6] == 0xE1:
text_error = 'нет связи или устройство не отвечает'
return text_error
text_error = 'Ok'
return text_error
и вот этом методе пытаюсь остановить поток. Вроде как по документации не желательно использовать terminate. Должно останавливать, но нет.
Код немного кусками, извините. просто я сначала пробую работу в маленьких программах, а затем, когда уже разобрался, переношу в своё приложение. Можно было бы использовать TIMEOUT из QSerialPort(), но не пойму как настроить. Ошибка по таймауту есть, а в настройках порта его настройки нет. Можно конечно попробовать PYSERIAL, но т.к. делаю в PyQt5, хочется обойтись штатными средствами