完成客户端的基本收发功能

This commit is contained in:
clinton 2025-07-14 16:40:07 +08:00
parent 94b7ceedc4
commit 232413f6d4
3 changed files with 82 additions and 4 deletions

View File

@ -0,0 +1,65 @@
# MQTT在线测试平台
## 项目简介
本项目基于 PySide6Qt for Python、paho-mqtt 和 Poetry结合 Mosquitto Broker旨在为远程设备提供一个可视化、可扩展的 MQTT 在线测试平台。支持多协议解析、响应机制扩展、动态配置和消息日志查看。
## 主要功能
- 支持远程设备通过MQTT协议连接、订阅、发布
- 支持多种协议解析JSON、XML、自定义协议等
- 支持多种响应机制(报警、自动回复、设备控制等)
- UI界面可动态配置MQTT参数、主题、协议类型、响应机制
- 实时显示设备连接状态和消息日志
- 配置可保存和自动加载
## 安装与运行
1. 安装依赖(建议使用 Poetry
```bash
poetry install
```
2. 运行主程序:
```bash
poetry run python src/mqtt_acess/main.py
```
3. 启动后可在界面上配置MQTT参数、主题、协议类型、响应机制并进行连接、订阅、发布等操作。
## 目录结构
```
mqtt_acess/
├── README.md
├── pyproject.toml
├── poetry.lock
├── src/
│ └── mqtt_acess/
│ ├── main.py # 程序入口
│ ├── config/ # 配置管理
│ ├── mqtt/ # MQTT客户端封装
│ ├── parser/ # 协议解析工厂与实现
│ ├── responder/ # 响应机制工厂与实现
│ └── ui/ # UI界面
└── tests/ # 单元测试
```
## 配置说明
- 所有配置MQTT服务器、端口、主题、协议类型、响应机制等可在UI界面填写并保存。
- 配置文件保存在 `src/mqtt_acess/config/user_config.json`,下次启动自动加载。
## 协议与响应机制扩展
### 扩展协议解析
1. 在 `src/mqtt_acess/parser/` 下新增解析器(如 `xxx_parser.py`),继承 `IParser` 并实现 `parse` 方法。
2. 在 `factory.py` 中注册新协议类型。
### 扩展响应机制
1. 在 `src/mqtt_acess/responder/` 下新增响应器(如 `xxx.py`),继承 `IResponder` 并实现 `respond` 方法。
2. 在 `ui/main_window.py``get_responder` 方法中注册新类型。
## 依赖环境
- Python >= 3.11
- PySide6 >= 6.5, < 6.6
- paho-mqtt
- poetry

View File

@ -1,11 +1,11 @@
{ {
"MQTT_BROKER": "localhost", "MQTT_BROKER": "172.16.19.153",
"MQTT_PORT": 1883, "MQTT_PORT": 1883,
"MQTT_USERNAME": "", "MQTT_USERNAME": "",
"MQTT_PASSWORD": "", "MQTT_PASSWORD": "",
"MQTT_TLS": false, "MQTT_TLS": false,
"MQTT_TOPICS": [ "MQTT_TOPICS": [
"test/topic" "edgex/events/#"
], ],
"PROTOCOL_TYPE": "json", "PROTOCOL_TYPE": "json",
"RESPONDER_TYPE": "alarm" "RESPONDER_TYPE": "alarm"

View File

@ -1,11 +1,14 @@
from PySide6.QtWidgets import (QMainWindow, QTextEdit, QPushButton, QVBoxLayout, QWidget, from PySide6.QtWidgets import (QMainWindow, QTextEdit, QPushButton, QVBoxLayout, QWidget,
QLineEdit, QLabel, QComboBox, QHBoxLayout, QMessageBox) QLineEdit, QLabel, QComboBox, QHBoxLayout, QMessageBox)
from PySide6.QtCore import Signal, Slot
from config.settings import Settings from config.settings import Settings
from responder.alarm import AlarmResponder from responder.alarm import AlarmResponder
from responder.auto_reply import AutoReplyResponder from responder.auto_reply import AutoReplyResponder
from responder.device_control import DeviceControlResponder from responder.device_control import DeviceControlResponder
class MainWindow(QMainWindow): class MainWindow(QMainWindow):
log_signal = Signal(str)
def __init__(self, mqtt_client, parser_factory, responder): def __init__(self, mqtt_client, parser_factory, responder):
super().__init__() super().__init__()
self.setWindowTitle("MQTT在线测试平台") self.setWindowTitle("MQTT在线测试平台")
@ -74,6 +77,9 @@ class MainWindow(QMainWindow):
container.setLayout(layout) container.setLayout(layout)
self.setCentralWidget(container) self.setCentralWidget(container)
# 信号连接
self.log_signal.connect(self.append_log)
def connect_mqtt(self): def connect_mqtt(self):
self.apply_config() self.apply_config()
self.mqtt_client.set_on_message(self.on_message) self.mqtt_client.set_on_message(self.on_message)
@ -89,15 +95,22 @@ class MainWindow(QMainWindow):
self.append_log("已断开MQTT") self.append_log("已断开MQTT")
def on_message(self, client, userdata, msg): def on_message(self, client, userdata, msg):
# 通过信号安全地传递到主线程
try:
raw = msg.payload.decode(errors='replace')
except Exception:
raw = str(msg.payload)
self.log_signal.emit(f"收到原始消息: {raw}")
try: try:
parser = self.parser_factory.get_parser(Settings.PROTOCOL_TYPE) parser = self.parser_factory.get_parser(Settings.PROTOCOL_TYPE)
parsed = parser.parse(msg.payload) parsed = parser.parse(msg.payload)
self.append_log(f"收到消息: {parsed}") self.log_signal.emit(f"收到消息: {parsed}")
responder = self.get_responder() responder = self.get_responder()
responder.respond(parsed) responder.respond(parsed)
except Exception as e: except Exception as e:
self.append_log(f"解析错误: {e}") self.log_signal.emit(f"解析错误: {e}")
@Slot(str)
def append_log(self, text): def append_log(self, text):
self.log.append(text) self.log.append(text)
self.text_edit.append(text) self.text_edit.append(text)